# # # Possible actions # :kill_before_decision # :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot # this is usefull when forcing a nack # defmodule Paxos do def start(name, processes) do IO.puts("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, name, processes]) Utils.register_name(name, pid, false) end # Init event must be the first # one after the component is created def init(name, parent, processes) do EventualLeaderElector.start(name, processes) EagerReliableBroadcast.start(name, processes) state = %{ name: name, parent: parent, processes: processes, leader: nil, instmap: %{}, decided: %{} } run(state) end def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do if Map.has_key?(state.instmap, inst) do state else instmap = Map.put(state.instmap, inst, %{ value: value, other_value: nil, ballot: Ballot.init(state.name, 0), aborted: false, ballot_value: nil, prepared_values: [], accepted: 0, accepted_ballot: nil, accepted_value: nil, pid_to_inform: pid_to_inform, has_sent_accept: false, action: action, has_sent_prepare: false, }) %{state | instmap: instmap} end end def has_finished(state, inst, ignore_aborted \\ false) do cond do Map.has_key?(state.decided, inst) -> true ignore_aborted -> false Map.has_key?(state.instmap, inst) -> state.instmap[inst].aborted true -> false end end def run(state) do run( receive do {:ele_trust, proc} -> IO.puts("#{state.name} - #{proc} is leader") Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> # IO.puts("#{state.name} - looping after leader: #{inst}") prepare(st, inst) end) {:propose, inst, value, t, pid_to_inform, action} -> IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}") cond do has_finished(state, inst, true) -> IO.puts("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}") send(pid_to_inform, {:decision, inst, state.decided[inst]}) state action == :increase_ballot_number -> state = has_or_create(state, inst) IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}") # Inform the pid with timeout right way send(pid_to_inform, {:timeout, inst}); set_instmap(state, inst, fn map -> %{map| ballot: Ballot.inc(map.ballot)} end) not Map.has_key?(state.instmap, inst) -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) state = has_or_create(state, inst, value, pid_to_inform, action) Process.send_after(self(), {:timeout, inst}, t) prepare(state, inst) state.instmap[inst].value == nil -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) Process.send_after(self(), {:timeout, inst}, t) prepare( set_instmap(state, inst, fn map -> %{ map | value: value, pid_to_inform: pid_to_inform, action: action, } end), inst ) true -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) prepare(state, inst) end {:rb_deliver, proc, {:other_propose, inst, value}} -> cond do has_finished(state, inst, true) -> EagerReliableBroadcast.broadcast( state.name, {:decide, inst, state.decided[inst]} ) state true -> state = has_or_create(state, inst) state = set_instmap(state, inst, fn map -> %{map | other_value: value} end) prepare(state, inst) end {:rb_deliver, proc, {:prepare, proc, inst, ballot}} -> IO.puts("#{state.name} - prepare from #{proc}") cond do has_finished(state, inst) -> state not Map.has_key?(state.instmap, inst) -> state = has_or_create(state, inst) Utils.unicast( {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value}, proc ) set_instmap(state, inst, fn map -> %{ map | ballot: ballot } end) Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> Utils.unicast( {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value}, proc ) set_instmap(state, inst, fn map -> %{ map | ballot: ballot } end) true -> Utils.unicast({:nack, inst, ballot}, proc) state end {:timeout, inst} -> if not has_finished(state, inst) do send(state.instmap[inst].pid_to_inform, {:timeout, inst}) end state {:nack, inst, ballot} -> IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}") cond do has_finished(state, inst) -> state state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) -> if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:abort, inst}) end EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot}) set_instmap(state, inst, fn map -> %{ map | has_sent_accept: false, has_sent_prepare: false, ballot: Ballot.inc(map.ballot), aborted: true, } end) true -> state end {:rb_deliver, _proc, {:abort, inst, ballot}} -> cond do has_finished(state, inst) -> state true -> IO.puts("#{state.name} - got information to send abort") if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:abort, inst}) end state end {:prepared, inst, ballot, accepted_ballot, accepted_value} -> IO.puts( "#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}" ) cond do has_finished(state, inst) -> state Ballot.compare(ballot, &==/2, state.instmap[inst].ballot) -> state = set_instmap(state, inst, fn map -> %{ map | prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}] } end) prepared(state, inst) Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> IO.puts("#{state.name} - Probably recieved this before preare came to self sending with delay") Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100) state true -> state end {:rb_deliver, proc, {:accept, inst, ballot, value}} -> cond do has_finished(state, inst) -> state true -> state = has_or_create(state, inst) if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}") Utils.unicast({:accepted, inst, ballot}, proc) set_instmap(state, inst, fn map -> %{ map | ballot: ballot, accepted_value: value, accepted_ballot: ballot } end) else IO.puts("#{state.name} -> #{proc} nack") Utils.unicast({:nack, inst, ballot}, proc) state end end {:accepted, inst, ballot} -> IO.puts("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}") cond do has_finished(state, inst) -> state state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) -> accepted( set_instmap(state, inst, fn map -> %{ map | accepted: map.accepted + 1 } end), inst ) true -> state end {:get_value, inst, pid_to_inform, t} -> # IO.puts("#{state.name} get_value") cond do t < 0 -> send(pid_to_inform, {:get_value_res, inst}) has_finished(state, inst, true) -> send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]}) true -> Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500) end state {:rb_deliver, _, {:decide, inst, value}} -> IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}") if has_finished(state, inst) do state else if Map.has_key?(state.instmap, inst) != nil and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } end end ) end def set_instmap(state, inst, set_instmap) do new_instmap = Map.put(state.instmap, inst, set_instmap.(state.instmap[inst])) %{state | instmap: new_instmap} end # # Puts process in the preapre state # def prepare(state, _) when state.leader != state.name, do: state def prepare(state, inst) do cond do state.instmap[inst] == nil -> state state.instmap[inst].value == nil and state.instmap[inst].other_value == nil -> state state.instmap[inst] != nil and state.instmap[inst].has_sent_prepare -> state state.instmap[inst] != nil and state.instmap[inst].has_sent_accept -> state true -> ballot = Ballot.inc(state.instmap[inst].ballot) IO.puts("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}") EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) set_instmap(state, inst, fn map -> %{ map | prepared_values: [], accepted: 0, aborted: false, ballot_value: nil, has_sent_prepare: true, has_sent_accept: false } end) end end # # Process the prepared responses # def prepared(state, _) when state.leader != state.name, do: state def prepared(state, inst) do if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and not state.instmap[inst].has_sent_accept do {_, a_val} = Enum.reduce(state.instmap[inst].prepared_values, {Ballot.init(state.name, 0), nil}, fn {bal, val}, {acc_bal, acc_val} -> cond do val == nil -> {acc_bal, acc_val} Ballot.compare(acc_bal, &>/2, bal) -> {acc_bal, acc_val} true -> {bal, val} end end) a_val = if a_val == nil do if state.instmap[inst].value == nil do state.instmap[inst].other_value else state.instmap[inst].value end else a_val end EagerReliableBroadcast.broadcast( state.name, {:accept, inst, state.instmap[inst].ballot, a_val} ) set_instmap(state, inst, fn map -> %{ map | ballot_value: a_val, has_sent_accept: true } end) else state end end # # Process the accepted responses # def accepted(state, _) when state.leader != state.name, do: state def accepted(state, inst) do if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do value = state.instmap[inst].ballot_value if state.instmap[inst].action == :kill_before_decision do IO.puts("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}") Process.exit(self(), :kill) end EagerReliableBroadcast.broadcast( state.name, {:decide, inst, value} ) if state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } else state end end def propose(pid, inst, value, t, action \\ nil) do send(pid, {:propose, inst, value, t, self(), action}) propose_loop(inst) end def propose_loop(inInst) do receive do {:timeout, inst} -> check_and_apply({:timeout}, inst, inInst, &propose_loop/1) {:abort, inst} -> check_and_apply({:abort}, inst, inInst, &propose_loop/1) {:decision, inst, d} -> check_and_apply({:decision, d}, inst, inInst, &propose_loop/1) x -> Process.send_after(self(), x, 500) propose_loop(inInst) end end def get_decision(pid, inst, t) do send(pid, {:get_value, inst, self(), t}) get_decision_loop(inst) end def get_decision_loop(inInst) do receive do {:get_value_res, inst} -> check_and_apply(nil, inst, inInst, &get_decision_loop/1) {:get_value_res_actual, inst, v} -> check_and_apply(v, inst, inInst, &get_decision_loop/1) x -> Process.send_after(self(), x, 500) get_decision_loop(inInst) end end def check_and_apply(v, inst, inInst, fun) do if inst == inInst do v else fun.(inInst) end end end defmodule Ballot do def init(name, number \\ 0) do {name, number} end def inc(b, name \\ nil) do {old_name, number} = b { if name == nil do old_name else name end, number + 1 } end defp lexicographical_compare(a, b) do cond do a == b -> 0 a > b -> 1 true -> -1 end end defp diff({name1, number1}, {name2, number2}) do diff = number1 - number2 if diff == 0 do lexicographical_compare(name1, name2) else diff end end def compare(b1, operator, b2) do operator.(diff(b1, b2), 0) end end