defmodule Paxos do def start(name, processes) do IO.puts("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, name, processes]) Utils.register_name(name, pid, false) end # Init event must be the first # one after the component is created def init(name, parent, processes) do EventualLeaderElector.start(name, processes) EagerReliableBroadcast.start(name, processes) state = %{ name: name, parent: parent, processes: processes, leader: nil, instmap: %{}, other_values: %{}, decided: %{}, aborted: MapSet.new(), timeout: MapSet.new() } run(state) end def add_inst_map(state, inst, value, pid_to_inform) do instmap = Map.put(state.instmap, inst, %{ value: value, ballot: 0, ballot_value: nil, prepared_values: [], accepted: 0, running_ballot: 0, accepted_ballot: nil, accepted_value: nil, pid_to_inform: pid_to_inform }) %{state | instmap: instmap} end def has_finished(state, inst) do Map.has_key?(state.decided, inst) or inst in state.timeout or inst in state.aborted end def run(state) do run( receive do {:ele_trust, proc} -> IO.puts("#{state.name} - #{proc} is leader") Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> # IO.puts("#{state.name} - looping after leader: #{inst}") prepare(st, inst) end) {:propose, inst, value, t, pid_to_inform} -> IO.puts("#{state.name} - Propose #{inst}") cond do has_finished(state, inst) -> send(pid_to_inform, {:abort, inst}) state not Map.has_key?(state.instmap, inst) -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) state = add_inst_map(state, inst, value, pid_to_inform) Process.send_after(self(), {:timeout, inst}, t) prepare(state, inst) state.instmap[inst].value == nil -> Process.send_after(self(), {:timeout, inst}, t) prepare( set_instmap(state, inst, %{ state.instmap[inst] | value: value, pid_to_inform: pid_to_inform }), inst ) true -> state end {:rb_deliver, _proc, {:other_propose, inst, value}} -> state = if Map.has_key?(state.instmap, inst) do state else add_inst_map(state, inst, nil, nil) end %{state | other_values: Map.put(state.other_values, inst, value)} {:rb_deliver, _proc, {:prepare, proc, inst, ballot}} -> IO.puts("#{state.name} - prepare") cond do has_finished(state, inst) -> state not Map.has_key?(state.instmap, inst) -> state ballot > state.instmap[inst].running_ballot -> Utils.unicast( {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value}, proc ) set_instmap(state, inst, %{ state.instmap[inst] | running_ballot: ballot }) true -> Utils.unicast({:nack, inst, ballot}, proc) state end {:timeout, inst} -> EagerReliableBroadcast.broadcast(state.name, {:timeout, inst}) state {:rb_deliver, _proc, {:timeout, inst}} -> IO.puts("#{state.name}- timeout") if has_finished(state, inst) do state else if Map.has_key?(state.instmap, inst) do if state.instmap[inst].pid_to_inform do send(state.instmap[inst].pid_to_inform, {:timeout, inst}) end end %{ state | instmap: Map.delete(state.instmap, inst), timeout: MapSet.put(state.timeout, inst) } end # {:rb_deliver, _proc, {:abort, inst}} -> # IO.puts("#{state.name}- abort") # if has_finished(state, inst) do # state # else # if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do # send(state.instmap[inst].pid_to_inform, {:abort, inst}) # end # %{ # state # | aborted: MapSet.put(state.aborted, inst), # instmap: Map.delete(state.instmap, inst) # } # end {:nack, inst, ballot} -> IO.puts("#{state.name}- nack") if has_finished(state, inst) do state else if state.leader == state.name and state.instmap[inst].ballot == ballot do # EagerReliableBroadcast.broadcast(state.name, {:abort, inst}) if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:abort, inst}) end state else state end end {:prepared, inst, ballot, accepted_ballot, accepted_value} -> IO.puts("#{state.name}- prepared") if has_finished(state, inst) do state else if ballot == state.instmap[inst].ballot do state = set_instmap(state, inst, %{ state.instmap[inst] | prepared_values: state.instmap[inst].prepared_values ++ [{accepted_ballot, accepted_value}] }) prepared(state, inst) else state end end {:rb_deliver, proc, {:accept, inst, ballot, value}} -> IO.puts("#{state.name} accept") if has_finished(state, inst) do state else if ballot >= state.instmap[inst].running_ballot do Utils.unicast({:accepted, inst, ballot}, proc) set_instmap(state, inst, %{ state.instmap[inst] | running_ballot: ballot, accepted_value: value, accepted_ballot: ballot }) else Utils.unicast({:nack, inst, ballot}, proc) state end end {:accepted, inst, ballot} -> IO.puts("#{state.name} accepted") if has_finished(state, inst) do state else if state.leader == state.name and state.instmap[inst].ballot == ballot do accepted( set_instmap(state, inst, %{ state.instmap[inst] | accepted: state.instmap[inst].accepted + 1 }), inst ) else state end end {:get_value, inst, pid_to_inform, t} -> # IO.puts("#{state.name} get_value") cond do t < 0 -> send(pid_to_inform, {:get_value_res, inst}) has_finished(state, inst) -> cond do inst in state.aborted -> send(pid_to_inform, {:get_value_res, inst}) inst in state.timeout -> send(pid_to_inform, {:get_value_res, inst}) Map.has_key?(state.decided, inst) -> send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]}) end true -> Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500) end state {:rb_deliver, _, {:decide, inst, value}} -> IO.puts("#{state.name} decided") if has_finished(state, inst) do state else if Map.has_key?(state.instmap, inst) != nil and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } end end ) end def set_instmap(state, inst, instmap) do new_instmap = Map.put(state.instmap, inst, instmap) %{state | instmap: new_instmap} end # # Puts process in the preapre state # def prepare(state, _) when state.leader != state.name, do: state def prepare(state, inst) do if Map.get(state.instmap, inst) == nil and Map.get(state.other_values, inst) == nil do state else ballot = state.instmap[inst].ballot + 1 EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) set_instmap(state, inst, %{ state.instmap[inst] | ballot: ballot, prepared_values: [], accepted: 0, ballot_value: nil }) end end # # Process the prepared responses # def prepared(state, _) when state.leader != state.name, do: state def prepared(state, inst) do if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 do {_, a_val} = Enum.reduce(state.instmap[inst].prepared_values, {0, nil}, fn {bal, val}, {a_bal, a_val} -> if a_bal > bal do {a_bal, a_val} else {bal, val} end end) a_val = if a_val == nil do if state.instmap[inst].value == nil do state.other_values[inst] else state.instmap[inst].value end else a_val end EagerReliableBroadcast.broadcast( state.name, {:accept, inst, state.instmap[inst].ballot, a_val} ) set_instmap(state, inst, %{ state.instmap[inst] | ballot_value: a_val }) else state end end # # Process the accepted responses # def accepted(state, _) when state.leader != state.name, do: state def accepted(state, inst) do if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do value = state.instmap[inst].ballot_value EagerReliableBroadcast.broadcast( state.name, {:decide, inst, value} ) if Map.has_key?(state.instmap, inst) != nil and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } else state end end def propose(pid, inst, value, t) do # Utils.unicast({:propose, value}, name) send(pid, {:propose, inst, value, t, self()}) propose_loop(inst) end def propose_loop(inInst) do receive do {:timeout, inst} -> if inInst == inst do {:timeout} else propose_loop(inInst) end {:abort, inst} -> if inInst == inst do {:abort} else propose_loop(inInst) end {:decision, inst, d} -> if inInst == inst do {:decision, d} else propose_loop(inInst) end _ -> propose_loop(inInst) end end def get_decision(pid, inst, t) do send(pid, {:get_value, inst, self(), t}) get_decision_loop(inst) end def get_decision_loop(inInst) do receive do {:get_value_res, inst} -> if inst == inInst do nil else get_decision_loop(inInst) end {:get_value_res_actual, inst, v} -> if inst == inInst do v else get_decision_loop(inInst) end _ -> get_decision_loop(inInst) end end end