# # Emits {:ele_trust, proc } # defmodule EventualLeaderElector do def getEleName(name) do String.to_atom(Atom.to_string(name) <> "_ele") end def start(name, processes) do new_name = getEleName(name) pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) Utils.register_name(new_name, pid) end # Init event must be the first # one after the component is created def init(name, parent, processes) do processes = Enum.map(processes, fn name -> getEleName(name) end) state = %{ name: name, parent: parent, processes: processes, timeout: 1000, heard_back: MapSet.new(), seq: 0, last_trust: nil } run(request_heartbeats(state)) end def request_heartbeats(state) do state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) Process.send_after(self(), {:timeout}, state.timeout) state end def run(state) do run( receive do {:heartbeat_request, name, seq} -> Utils.unicast({:heartbeat, state.parent, seq}, name) state {:heartbeat, name, seq} -> if seq == state.seq do %{state | heard_back: MapSet.put(state.heard_back, name)} else state end {:timeout} -> state = if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do state else to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) if state.last_trust != to_trust do Utils.unicast({:ele_trust, to_trust}, state.parent) %{state | last_trust: to_trust} else state end end request_heartbeats(state) end ) end end