defmodule EagerReliableBroadcast do def get_rb_name(name) do String.to_atom(Atom.to_string(name) <> "_rb") end def get_non_rb_name(name) do String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) end def start(name, processes) do pid = spawn(EagerReliableBroadcast, :init, [name, processes]) Utils.register_name(get_rb_name(name), pid) end # Init event must be the first # one after the component is created def init(parent, processes) do state = %{ name: get_rb_name(parent), parent: parent, processes: Enum.map(processes, fn name -> get_rb_name(name) end), # Use this data structure to remember IDs of the delivered messages delivered: %{}, # Use this variable to remember the last sequence number used to identify a message seq_no: 0 } run(state) end def run(state) do run( receive do # Handle the broadcast request event {:broadcast, m} -> data_msg = {:data, state.name, state.seq_no, m} Utils.beb_broadcast(data_msg, state.processes) %{state | seq_no: state.seq_no + 1} {:data, proc, seq_no, m} -> if not Map.has_key?(state.delivered, {proc, seq_no, m}) do data_msg = {:data, proc, seq_no, m} Utils.beb_broadcast(data_msg, state.processes) Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} else val = Map.get(state.delivered, {proc, seq_no, m}) if val < Enum.count(state.processes) do %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} else %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} end end end ) end def broadcast(name, m) do Utils.unicast({:broadcast, m}, get_rb_name(name)) end end