# # Emits {:ele_trust, proc } # defmodule EventualLeaderElector do def getEleName(name) do String.to_atom(Atom.to_string(name) <> "_ele") end def getOriginalName(name) do String.to_atom(String.replace(Atom.to_string(name), "_ele", "")) end def start(name, processes) do new_name = getEleName(name) pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) Utils.register_name(new_name, pid) end # Init event must be the first # one after the component is created def init(name, parent, processes) do processes = Enum.map(processes, fn name -> getEleName(name) end) state = %{ name: name, parent: parent, processes: processes, mp_processes: MapSet.new(processes), timeout: 1000, heard_back: MapSet.new(), seq: 0, last_trust: nil } run(request_heartbeats(state)) end def request_heartbeats(state) do state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) Process.send_after(self(), {:timeout}, state.timeout) state end def run(state) do run( receive do {:heartbeat_request, name, seq} -> Utils.unicast({:heartbeat, state.name, seq}, name) state {:heartbeat, name, seq} -> if seq == state.seq do %{state | heard_back: MapSet.put(state.heard_back, name)} else state end {:timeout} -> active = MapSet.intersection(state.mp_processes, state.heard_back) state = %{state | heard_back: MapSet.new()} state = if MapSet.size(active) == 0 do state else to_trust = Enum.at(Enum.sort(MapSet.to_list(active)), 0) to_trust = getOriginalName(to_trust) if state.last_trust != to_trust do Utils.unicast({:ele_trust, to_trust}, state.parent) %{state | last_trust: to_trust} else state end end request_heartbeats(state) end ) end end