~jonn/bed

cf29fd8187f54556c2c23cf609838bf50d043e45 — Jonn 3 years ago 5553f9b master
Problem: io_srv doesn't know where to send jobs

Solution:

 - Pass the information about through sofo_monitor to subscribers
    a. Workers that are automatically started in sofo_pool at
	initialisation
    b. Workers that might be dynamically added as the application runs
	(dead code so far)
 - Have io_srv subscribe to sofo_monitor
M apps/bed/src/bed.app.src => apps/bed/src/bed.app.src +10 -1
@@ 1,1 1,10 @@
{ application , bed , [ { description , "An OTP application" } , { vsn , "0.1.0" } , { registered , [ ] } , { mod , { bed_app , [ ] } } , { applications , [ kernel , stdlib ] } , { env , [ ] } , { modules , [ ] } , { licenses , [ "Apache 2.0" ] } , { links , [ ] } ] } .
{ application , bed ,
    [   { description , "An OTP application" } ,
        { vsn , "0.1.0" } ,
        { registered , [ ] } ,
        { mod , { bed_app , [ ] } } ,
        { applications , [ kernel , stdlib ] } ,
        { env , [ ] } ,
        { modules , [ ] } ,
        { licenses , [ "WTFPL 2.0" ] } ,
        { links , [ ] } ] } .

M apps/bed/src/bed_io_srv.erl => apps/bed/src/bed_io_srv.erl +9 -6
@@ 9,27 9,30 @@
-export([receive_worker_list_change/2]).

-record(state,
        {sofo_pool_pids = sets:from_list([]) :: sets:set(pid()),
        {workers = sets:from_list([]) :: sets:set(pid()),
         chunk_inbox = [] :: [binary()]}).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init(_) ->
    bed_sofo_pool:notify_worker_list_changes_via({?MODULE,
                                                  receive_worker_list_change}),
    {ok, #state{}}.
    {ok, KnownWorkers} = bed_sofo_monitor:subscribe_to_worker_list_changes({?MODULE,
                                                       receive_worker_list_change}),
    {ok, #state{workers = KnownWorkers}}.

-spec receive_worker_list_change(Event :: enter | leave,
                                 Pids :: sets:set(pid())) ->
                                    term().
receive_worker_list_change(Event, Pids) ->
    %io:format("Got callback telling about ~p ~ping.~n", [Event, Pids]),
    gen_server:call(?MODULE, {receive_worker_list_change, {Event, Pids}}).

handle_call({receive_worker_list_change, {enter, PidsDelta}},
            _From,
            State0 = #state{sofo_pool_pids = Pids0}) ->
    {reply, {error, undefined}, State0};
            State0 = #state{workers = Spids}) ->
    State1 = State0#state{workers = sets:union(Spids, PidsDelta)},
    %io:format("After deliberation,~n~p~n~nBecame~n~p~n~n.", [State0, State1]),
    {reply, {ok, State1#state.workers}, State1};
handle_call(_, _, S0) ->
    {reply, {error, nop}, S0}.


A apps/bed/src/bed_sofo_monitor.erl => apps/bed/src/bed_sofo_monitor.erl +50 -0
@@ 0,0 1,50 @@
-module(bed_sofo_monitor).

-behaviour(gen_server).

-export([start_link/0]).
-export([init/1,
         handle_call/3,
         handle_cast/2]).
-export([children_added/1,
         subscribe_to_worker_list_changes/1]).

-record(state,
        {annoucement_mfs = sets:from_list([]) :: sets:set({atom(), atom()}),
         workers = sets:from_list([]) :: sets:set(pid())}).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
    {ok, #state{}}.

-spec subscribe_to_worker_list_changes({atom(), atom()}) -> {ok, sets:set(pid())}.
subscribe_to_worker_list_changes({M, F}) ->
    gen_server:call(?MODULE, {subscribe_to_worker_list_changes, {M, F}}).

-spec children_added([pid()]) -> ok.
children_added(Pids) ->
    gen_server:cast(?MODULE, {children_added, Pids}).

handle_call({subscribe_to_worker_list_changes, {M, F}}, _From, State0 = #state{annoucement_mfs = Amfs, workers = Ws}) ->
    %io:format("Handling ~p in SOFO monitor", [{M, F}]),
    {reply,
     {ok, Ws},
     State0#state{annoucement_mfs =
                      sets:add_element({M, F}, Amfs)}}.

handle_cast({children_added, Pids},
            State0 =
                #state{annoucement_mfs = Amfs,
                       workers = Ws}) ->
    Fresh = sets:from_list(Pids),
    ok = announce({enter, Fresh}, sets:to_list(Amfs)),
    {noreply, State0#state{workers = sets:union(Ws, Fresh)}}.

-spec announce({enter | leave, sets:set(pid())}, [{atom(), atom()}]) -> term().
announce(_, []) ->
    ok;
announce({Event, Pids}, [{Am, Af} | Amfs]) ->
    erlang:apply(Am, Af, [Event, Pids]),
    announce({Event, Pids}, Amfs).

M apps/bed/src/bed_sofo_pool.erl => apps/bed/src/bed_sofo_pool.erl +36 -6
@@ 1,9 1,39 @@
-module(bed_sofo_pool).

-export([notify_worker_list_changes_via/1]).
-behaviour(supervisor).

-spec notify_worker_list_changes_via({atom(), atom()}) -> ok | {error, atom()}.
notify_worker_list_changes_via({Module, Function}) ->
    {error, undefined};
notify_worker_list_changes_via(_) ->
    {error, badarg}.
-export([start_link/0,
         init/1]).

start_link() ->
    {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
    {ok, _} = spawn_children(erlang:system_info(logical_processors_available)),
    %spawn_link(fun() ->
    %              timer:sleep(666),
    %              spawn_children(erlang:system_info(logical_processors_available))
    %           end),
    {ok, Pid}.

spawn_children(X) ->
    spawn_children_do(X, []).

spawn_children_do(0, Acc) ->
    bed_sofo_monitor:children_added(Acc),
    {ok, Acc};
spawn_children_do(N, Acc) ->
    {ok, Pid} = supervisor:start_child(?MODULE, []),
    spawn_children_do(N - 1, [Pid | Acc]).

init([]) ->
    SupFlags =
        #{strategy => simple_one_for_one,
          intensity => 5,
          period => 1},
    WorkerSpec =
        [#{id => bed_sofo_worker,
           start => {bed_sofo_worker, start_link, []},
           restart => transient,
           shutdown => 5000,
           type => worker,
           modules => [bed_sofo_worker]}],
    {ok, {SupFlags, WorkerSpec}}.

M apps/bed/src/bed_sofo_worker.erl => apps/bed/src/bed_sofo_worker.erl +19 -0
@@ 1,1 1,20 @@
-module(bed_sofo_worker).

-behaviour(gen_server).

-export([start_link/0,
         init/1]).
-export([handle_call/3,
         handle_cast/2]).

start_link() ->
    gen_server:start_link(?MODULE, [], []).

init([]) ->
    {ok, undefined}.

handle_cast(_, _) ->
    {noreply, undefined}.

handle_call(_, _, _) ->
    {reply, undefined, undefined}.

M apps/bed/src/bed_sup.erl => apps/bed/src/bed_sup.erl +9 -2
@@ 32,7 32,14 @@ init([]) ->
        #{strategy => one_for_all,
          intensity => 0,
          period => 1},
    ChildSpecs = [],
    ChildSpecs =
        [#{id => bed_sofo_monitor,
           start => {bed_sofo_monitor, start_link, []}},
         #{id => bed_sofo_pool,
           start => {bed_sofo_pool, start_link, []},
           type => supervisor},
         #{id => bed_io_srv,
           start => {bed_io_srv, start_link, []}}],
    {ok, {SupFlags, ChildSpecs}}.

%% internal functions


@@ 63,7 70,7 @@ fan_bytes(BitsPerRule, Workers, FH) ->
    %
    % Bits to fan out:
    %   $ b_{fo} = b_r \frac{8}{g(b_r, 8)} $ where $ b_r $ is
    % bit alignment of the rule and $ g $ is GCD function.
    % bit alignment of the rules and $ g $ is GCD function.
    %
    % Bytes to fan out then is:
    %   $ B_{fo} =  \frac{b_{fo}}{8} = \frac{b_r}{g(b_r, 8)} $

M rebar.config => rebar.config +13 -3
@@ 2,10 2,20 @@

{ deps , [ ] } .

{ relx , [ { release , { bed , "0.1.0" } , [ bed , sasl ] } , { mode , dev } , { sys_config , "./config/sys.config" } , { vm_args , "./config/vm.args" } ] } .
{ relx ,
  [ { release , { bed , "0.1.0" } , [ bed , sasl ] }
  , { mode , dev }
  , { sys_config , "./config/sys.config" }
  , { vm_args , "./config/vm.args" } ] } .

{ profiles , [ { prod , [ { relx , [ { mode , prod } ] } ] } ] } .
{ profiles ,
  [ { prod , [ { relx , [ { mode , prod } ] } ] } ] } .

{ plugins , [ rebar3_format ] } .

{ format , [ { files , [ "rebar.config" , "apps/bed/src/*.erl" , "apps/bed/src/*.app.src" ] } , { formatter , default_formatter } , { options , #{ paper => 80 , inline_attributes => none , inline_fields => none , inline_items => none } } ] } .
{ format , [ { files , [ "apps/bed/src/*.erl" ] }
           , { formatter , default_formatter }
           , { options , #{ paper => 80
                          , inline_attributes => none
                          , inline_fields => none
                          , inline_items => none } } ] } .