From 0dc9215672423b08c692807e9f3ccb314c479d58 Mon Sep 17 00:00:00 2001 From: Andre Henriques Date: Tue, 16 Jan 2024 15:05:14 +0000 Subject: [PATCH] improved code --- lib/paxos.ex | 174 +++++++++++++----------------------- lib/paxos_test_aditional.ex | 4 +- lib/test_script.exs | 1 - 3 files changed, 64 insertions(+), 115 deletions(-) diff --git a/lib/paxos.ex b/lib/paxos.ex index e0a8bbc..ee02c74 100644 --- a/lib/paxos.ex +++ b/lib/paxos.ex @@ -8,14 +8,6 @@ defmodule Utils do end end - @deprecated - def unicast(m, p) do - case :global.whereis_name(p) do - pid when is_pid(pid) -> send(pid, m) - :undefined -> :ok - end - end - def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m)) def register_name(name, pid, link \\ true) do @@ -26,7 +18,6 @@ defmodule Utils do if link do Process.link(pid) end - pid :no -> @@ -35,20 +26,14 @@ defmodule Utils do end end - defmacro checkinst(val, do: expr) do + defmacro or_state(val, do: expr) do quote do - case var!(state).instmap[var!(inst)] != nil do - unquote(val) -> unquote(expr) + case unquote(val) do + true -> unquote(expr) _ -> var!(state) end end end - - defmacro checkinst(do: expr) do - quote do - checkinst(true, expr) - end - end end # @@ -76,7 +61,6 @@ defmodule Paxos do IO.puts("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, name, processes]) - register_name(name, pid, false) end @@ -100,7 +84,7 @@ defmodule Paxos do # Guarantees that a specific state exists def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do - checkinst false do + or_state state.instmap[inst] == nil do instmap = Map.put(state.instmap, inst, %{ value: value, @@ -138,11 +122,10 @@ defmodule Paxos do IO.puts("#{state.name} - #{proc} is leader") Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> - # IO.puts("#{state.name} - looping after leader: #{inst}") prepare(st, inst) end) - {:propose, inst, value, t, pid_to_inform, action} -> + {:propose, inst, value, pid_to_inform, action} -> IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}") cond do @@ -152,12 +135,8 @@ defmodule Paxos do state action == :increase_ballot_number -> - state = has_or_create(state, inst) - IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}") - - # Inform the pid with timeout right way - send(pid_to_inform, {:timeout, inst}); + state = has_or_create(state, inst) set_instmap do %{map| ballot: Ballot.inc(map.ballot)} @@ -166,12 +145,10 @@ defmodule Paxos do not Map.has_key?(state.instmap, inst) -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) state = has_or_create(state, inst, value, pid_to_inform, action) - Process.send_after(self(), {:timeout, inst}, t) prepare(state, inst) state.instmap[inst].value == nil -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) - Process.send_after(self(), {:timeout, inst}, t) set_instmap do %{ map | @@ -240,13 +217,6 @@ defmodule Paxos do state end - {:timeout, inst} -> - if not has_finished(state, inst) do - send(state.instmap[inst].pid_to_inform, {:timeout, inst}) - end - - state - {:nack, inst, ballot} -> IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}") @@ -286,7 +256,6 @@ defmodule Paxos do end state - end {:prepared, inst, ballot, accepted_ballot, accepted_value} -> @@ -305,7 +274,6 @@ defmodule Paxos do | prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}] } end - prepared(state, inst) Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> @@ -364,36 +332,24 @@ defmodule Paxos do state end - {:get_value, inst, pid_to_inform, t} -> + {:get_value, inst, pid_to_inform} -> # IO.puts("#{state.name} get_value") - - cond do - t < 0 -> - send(pid_to_inform, {:get_value_res, inst}) - - has_finished(state, inst, true) -> + if has_finished(state, inst, true) do send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]}) - - true -> - Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500) end - state {:rb_deliver, _, {:decide, inst, value}} -> IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}") - if has_finished(state, inst) do - state - else + or_state not has_finished(state, inst) do if Map.has_key?(state.instmap, inst) != nil and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end - %{ - state - | decided: Map.put(state.decided, inst, value), + %{ state | + decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } end @@ -427,9 +383,8 @@ defmodule Paxos do EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) set_instmap do - %{ - map - | prepared_values: [], + %{ map | + prepared_values: [], accepted: 0, aborted: false, ballot_value: nil, @@ -522,77 +477,75 @@ defmodule Paxos do end def propose(pid, inst, value, t, action \\ nil) do - send(pid, {:propose, inst, value, t, self(), action}) + send(pid, {:propose, inst, value, self(), action}) - propose_loop(inst) + propose_loop({inst, t}) end - def propose_loop(inInst) do + def propose_loop(input) do + {_, t} = input receive do {:timeout, inst} -> - check_and_apply({:timeout}, inst, inInst, &propose_loop/1) + check_and_apply({:timeout}, inst, input, &propose_loop/1) {:abort, inst} -> - check_and_apply({:abort}, inst, inInst, &propose_loop/1) + check_and_apply({:abort}, inst, input, &propose_loop/1) {:decision, inst, d} -> - check_and_apply({:decision, d}, inst, inInst, &propose_loop/1) + check_and_apply({:decision, d}, inst, input, &propose_loop/1) x -> Process.send_after(self(), x, 500) - propose_loop(inInst) + propose_loop(input) + after + t -> {:timeout} end end def get_decision(pid, inst, t) do - send(pid, {:get_value, inst, self(), t}) - get_decision_loop(inst) + send(pid, {:get_value, inst, self()}) + get_decision_loop({inst, t}) end - def get_decision_loop(inInst) do + def get_decision_loop(input) do + {_, t} = input receive do {:get_value_res, inst} -> - check_and_apply(nil, inst, inInst, &get_decision_loop/1) + check_and_apply(nil, inst, input, &get_decision_loop/1) {:get_value_res_actual, inst, v} -> - check_and_apply(v, inst, inInst, &get_decision_loop/1) + check_and_apply(v, inst, input, &get_decision_loop/1) x -> Process.send_after(self(), x, 500) - get_decision_loop(inInst) + get_decision_loop(input) + after + t -> nil end end - def check_and_apply(v, inst, inInst, fun) do + def check_and_apply(v, inst, input, fun) do + {inInst, _} = input if inst == inInst do v else - fun.(inInst) + fun.(input) end end end defmodule Ballot do - - def init(name, number \\ 0) do - {name, number} - end + def init(name, number \\ 0), do: {name, number} def inc(b, name \\ nil) do {old_name, number} = b { - if name == nil do - old_name - else - name - end, - + if name == nil do old_name else name end, number + 1 } end - defp lexicographical_compare(a, b) do cond do a == b -> 0 @@ -611,13 +564,13 @@ defmodule Ballot do end end - def compare(b1, operator, b2) do - operator.(diff(b1, b2), 0) - end - + def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0) end defmodule EagerReliableBroadcast do + require Utils + import Utils + def get_rb_name(name) do String.to_atom(Atom.to_string(name) <> "_rb") end @@ -628,7 +581,7 @@ defmodule EagerReliableBroadcast do def start(name, processes) do pid = spawn(EagerReliableBroadcast, :init, [name, processes]) - Utils.register_name(get_rb_name(name), pid) + register_name(get_rb_name(name), pid) end # Init event must be the first @@ -653,15 +606,15 @@ defmodule EagerReliableBroadcast do # Handle the broadcast request event {:broadcast, m} -> data_msg = {:data, state.name, state.seq_no, m} - Utils.beb_broadcast(data_msg, state.processes) + beb_broadcast(data_msg, state.processes) %{state | seq_no: state.seq_no + 1} {:data, proc, seq_no, m} -> if not Map.has_key?(state.delivered, {proc, seq_no, m}) do data_msg = {:data, proc, seq_no, m} - Utils.beb_broadcast(data_msg, state.processes) + beb_broadcast(data_msg, state.processes) - Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) + safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m}) %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} else val = Map.get(state.delivered, {proc, seq_no, m}) @@ -676,9 +629,10 @@ defmodule EagerReliableBroadcast do ) end - def broadcast(name, m) do - Utils.unicast({:broadcast, m}, get_rb_name(name)) - end + ############# + # Interface # + ############# + def broadcast(name, m), do: safecast(get_rb_name(name), {:broadcast, m}) end # @@ -686,6 +640,9 @@ end # defmodule EventualLeaderElector do + require Utils + import Utils + def getEleName(name) do String.to_atom(Atom.to_string(name) <> "_ele") end @@ -694,7 +651,7 @@ defmodule EventualLeaderElector do new_name = getEleName(name) pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) - Utils.register_name(new_name, pid) + register_name(new_name, pid) end # Init event must be the first @@ -717,7 +674,7 @@ defmodule EventualLeaderElector do def request_heartbeats(state) do state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} - Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) + beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) Process.send_after(self(), {:timeout}, state.timeout) state @@ -727,30 +684,23 @@ defmodule EventualLeaderElector do run( receive do {:heartbeat_request, name, seq} -> - Utils.unicast({:heartbeat, state.parent, seq}, name) + safecast(name, {:heartbeat, state.parent, seq}) state {:heartbeat, name, seq} -> - if seq == state.seq do + or_state seq == state.seq do %{state | heard_back: MapSet.put(state.heard_back, name)} - else - state end {:timeout} -> - state = - if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do - state - else - to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) + state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do + to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) - if state.last_trust != to_trust do - Utils.unicast({:ele_trust, to_trust}, state.parent) - %{state | last_trust: to_trust} - else - state - end + or_state state.last_trust != to_trust do + safecast(state.parent, {:ele_trust, to_trust}) + %{state | last_trust: to_trust} end + end request_heartbeats(state) end diff --git a/lib/paxos_test_aditional.ex b/lib/paxos_test_aditional.ex index 86c93e7..c41eea4 100644 --- a/lib/paxos_test_aditional.ex +++ b/lib/paxos_test_aditional.ex @@ -125,7 +125,7 @@ defmodule PaxosTestAditional do IO.puts("#{inspect(name)}: started") [leader | spare] = Enum.sort(participants) - increase_ballot = Enum.take(spare, floor(length(participants) / 2) + 1) + increase_ballot = Enum.take(spare, floor(length(participants) / 2)) if name == leader do # Propose when passed with :kill_before_decision will die right before a decision is selected @@ -199,7 +199,7 @@ defmodule PaxosTestAditional do participants = Enum.sort(participants) [leader | spare ] = participants - increase_ballot = Enum.take(spare, floor(length(participants) / 2) + 1) + increase_ballot = Enum.take(spare, floor(length(participants) / 2)) [non_leader | _] = Enum.reverse(spare) if name == non_leader do diff --git a/lib/test_script.exs b/lib/test_script.exs index df6add3..47207cc 100755 --- a/lib/test_script.exs +++ b/lib/test_script.exs @@ -75,7 +75,6 @@ test_suite = [ "Non-Leader proposes after leader is elected, 5 nodes"}, {&PaxosTestAditional.run_non_leader_send_propose_after_leader_elected/3, TestUtil.get_local_config(5), 10, "Non-Leader proposes after leader is elected, 5 local procs"}, - {&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_dist_config(host, 5), 10, "Leader should nack before decision and then come to decision, no concurrent ballots, 5 nodes"},