From c97f2e9a225c3ecf32b0db554a3b2b6a171fda7c Mon Sep 17 00:00:00 2001 From: Andre Henriques Date: Tue, 28 Nov 2023 21:42:11 +0000 Subject: [PATCH] Paxos version 1 --- eager_reliable_broadcast.ex | 95 -------------- lib/distributed_system_coursework.ex | 12 ++ lib/eager_reliable_broadcast.ex | 63 ++++++++++ lib/leader_elector.ex | 86 +++++++++++++ lib/mix/tasks/test_leader_elector.ex | 54 ++++++++ lib/mix/tasks/test_paxos.ex | 47 +++++++ lib/paxos.ex | 178 +++++++++++++++++++++++++++ lib/utils.ex | 24 ++++ mix.exs | 1 + 9 files changed, 465 insertions(+), 95 deletions(-) delete mode 100644 eager_reliable_broadcast.ex create mode 100644 lib/distributed_system_coursework.ex create mode 100644 lib/eager_reliable_broadcast.ex create mode 100644 lib/leader_elector.ex create mode 100644 lib/mix/tasks/test_leader_elector.ex create mode 100644 lib/mix/tasks/test_paxos.ex create mode 100644 lib/paxos.ex create mode 100644 lib/utils.ex diff --git a/eager_reliable_broadcast.ex b/eager_reliable_broadcast.ex deleted file mode 100644 index 75c69e4..0000000 --- a/eager_reliable_broadcast.ex +++ /dev/null @@ -1,95 +0,0 @@ -defmodule EagerReliableBroadcast do - def start(name, processes) do - pid = spawn(EagerReliableBroadcast, :init, [name, processes]) - # :global.unregister_name(name) - case :global.re_register_name(name, pid) do - :yes -> pid - :no -> :error - end - IO.puts "registered #{name}" - pid - end - - # Init event must be the first - # one after the component is created - def init(name, processes) do - state = %{ - name: name, - processes: processes, - delivered: %{}, # Use this data structure to remember IDs of the delivered messages - seq_no: 0 # Use this variable to remember the last sequence number used to identify a message - } - run(state) - end - - def run(state) do - state = receive do - # Handle the broadcast request event - {:broadcast, m} -> - IO.puts("#{inspect state.name}: RB-broadcast: #{inspect m}") - # Create a unique message identifier from state.name and state.seqno. - # Create a new data message data_msg from the given payload m - # the message identifier. - # Update the state as necessary - data_msg = {:data, state.name, state.seq_no, m} - - # Use the provided beb_broadcast function to propagate data_msg to - - # all process - beb_broadcast(data_msg, state.processes) - %{state | seq_no: state.seq_no + 1 } - - {:data, proc, seq_no, m} -> - if not Map.has_key?(state.delivered, {proc, seq_no, m}) do - data_msg = {:data, proc, seq_no, m} - beb_broadcast(data_msg, state.processes) - unicast({:deliver, proc, m}, state.name) - %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} - else - val = Map.get(state.delivered, {proc, seq_no, m}) - if val < Enum.count(state.processes) do - %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} - else - %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} - end - end - - # If was already delivered, do nothing. - # Otherwise, update delivered, generate a deliver event for the - # upper layer, and re-broadcast (echo) the received message. - # In both cases, do not forget to return the state. - - {:deliver, proc, m} -> - # Simulate the deliver indication event - IO.puts("#{inspect state.name}: RB-deliver: #{inspect m} from #{inspect proc}") - state - end - run(state) - end - - - defp unicast(m, p) do - case :global.whereis_name(p) do - pid when is_pid(pid) -> send(pid, m) - :undefined -> :ok - end - end - - defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p) - - # You can use this function to simulate a process failure. - # name: the name of this process - # proc_to_fail: the name of the failed process - # fail_send_to: list of processes proc_to_fail will not be broadcasting messages to - # Note that this list must include proc_to_fail. - # m and dest are the same as the respective arguments of the normal - # beb_broadcast. - defp beb_broadcast_with_failures(name, proc_to_fail, fail_send_to, m, dest) do - if name == proc_to_fail do - for p <- dest, p not in fail_send_to, do: unicast(m, p) - else - for p <- dest, p != proc_to_fail, do: unicast(m, p) - end - end - -end diff --git a/lib/distributed_system_coursework.ex b/lib/distributed_system_coursework.ex new file mode 100644 index 0000000..5fc02a8 --- /dev/null +++ b/lib/distributed_system_coursework.ex @@ -0,0 +1,12 @@ +defmodule Runner do + use Application + + def runner_leader_elector_test() do + IO.puts("Run leader elector test") + end + + def start(_type, _args) do + IO.puts("Starting Runner") + Mix.Task.run(Runner.runner_leader_elector_test()) + end +end diff --git a/lib/eager_reliable_broadcast.ex b/lib/eager_reliable_broadcast.ex new file mode 100644 index 0000000..8933051 --- /dev/null +++ b/lib/eager_reliable_broadcast.ex @@ -0,0 +1,63 @@ +defmodule EagerReliableBroadcast do + def get_rb_name(name) do + String.to_atom(Atom.to_string(name) <> "_rb") + end + + def get_non_rb_name(name) do + String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) + end + + def start(name, processes) do + pid = spawn(EagerReliableBroadcast, :init, [name, processes]) + Utils.register_name(get_rb_name(name), pid) + end + + # Init event must be the first + # one after the component is created + def init(parent, processes) do + state = %{ + name: get_rb_name(parent), + parent: parent, + processes: Enum.map(processes, fn name -> get_rb_name(name) end), + # Use this data structure to remember IDs of the delivered messages + delivered: %{}, + # Use this variable to remember the last sequence number used to identify a message + seq_no: 0 + } + + run(state) + end + + def run(state) do + run( + receive do + # Handle the broadcast request event + {:broadcast, m} -> + data_msg = {:data, state.name, state.seq_no, m} + Utils.beb_broadcast(data_msg, state.processes) + %{state | seq_no: state.seq_no + 1} + + {:data, proc, seq_no, m} -> + if not Map.has_key?(state.delivered, {proc, seq_no, m}) do + data_msg = {:data, proc, seq_no, m} + Utils.beb_broadcast(data_msg, state.processes) + + Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} + else + val = Map.get(state.delivered, {proc, seq_no, m}) + + if val < Enum.count(state.processes) do + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} + else + %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} + end + end + end + ) + end + + def broadcast(name, m) do + Utils.unicast({:broadcast, m}, get_rb_name(name)) + end +end diff --git a/lib/leader_elector.ex b/lib/leader_elector.ex new file mode 100644 index 0000000..49437fa --- /dev/null +++ b/lib/leader_elector.ex @@ -0,0 +1,86 @@ +# +# Emits {:ele_trust, proc } +# + +defmodule EventualLeaderElector do + def getEleName(name) do + String.to_atom(Atom.to_string(name) <> "_ele") + end + + def getOriginalName(name) do + String.to_atom(String.replace(Atom.to_string(name), "_ele", "")) + end + + def start(name, processes) do + new_name = getEleName(name) + pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) + + Utils.register_name(new_name, pid) + end + + # Init event must be the first + # one after the component is created + def init(name, parent, processes) do + processes = Enum.map(processes, fn name -> getEleName(name) end) + + state = %{ + name: name, + parent: parent, + processes: processes, + mp_processes: MapSet.new(processes), + timeout: 1000, + heard_back: MapSet.new(), + seq: 0, + last_trust: nil + } + + run(request_heartbeats(state)) + end + + def request_heartbeats(state) do + state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} + Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) + + Process.send_after(self(), {:timeout}, state.timeout) + state + end + + def run(state) do + run( + receive do + {:heartbeat_request, name, seq} -> + Utils.unicast({:heartbeat, state.name, seq}, name) + state + + {:heartbeat, name, seq} -> + if seq == state.seq do + %{state | heard_back: MapSet.put(state.heard_back, name)} + else + state + end + + {:timeout} -> + active = MapSet.intersection(state.mp_processes, state.heard_back) + + state = %{state | heard_back: MapSet.new()} + + state = + if MapSet.size(active) == 0 do + state + else + to_trust = Enum.at(MapSet.to_list(active), 0) + to_trust = getOriginalName(to_trust) + + if state.last_trust != to_trust do + Utils.unicast({:ele_trust, to_trust}, state.parent) + %{state | last_trust: to_trust} + else + state + end + end + + request_heartbeats(state) + end + ) + end +end diff --git a/lib/mix/tasks/test_leader_elector.ex b/lib/mix/tasks/test_leader_elector.ex new file mode 100644 index 0000000..c05857a --- /dev/null +++ b/lib/mix/tasks/test_leader_elector.ex @@ -0,0 +1,54 @@ +defmodule Mix.Tasks.TestLeaderElector do + use Mix.Task + + def run(_) do + IO.puts("Testing Leader Elector") + + procs = Enum.map(1..20, fn i -> String.to_atom("p#{i}") end) + pids = Enum.map(procs, fn p -> start(p, procs) end) + + IO.puts("spawned") + + Process.send_after(self(), {:end}, 50000) + + receive do + {:end} -> + IO.puts("test ended") + end + end + + def start(name, procs) do + pid = spawn(Mix.Tasks.TestLeaderElector, :init, [name, procs]) + + case :global.re_register_name(name, pid) do + :yes -> + # Note this is running on the parent so we are linking the parent to the rb + # so that when we close the parent the rb also dies + pid + + :no -> + :error + end + end + + def init(name, procs) do + IO.puts("#{name}: init") + EventualLeaderElector.start(name, procs) + + run_test(name) + end + + def run_test(name) do + receive do + {:ele_trust, proc} -> + IO.puts("#{name}: trust process #{Atom.to_string(proc)}") + + if proc == name do + IO.puts("#{name}: Trust myself will stop\n\n\n") + Process.exit(self(), :exit) + end + end + + run_test(name) + end +end diff --git a/lib/mix/tasks/test_paxos.ex b/lib/mix/tasks/test_paxos.ex new file mode 100644 index 0000000..7c0bd6f --- /dev/null +++ b/lib/mix/tasks/test_paxos.ex @@ -0,0 +1,47 @@ +defmodule Mix.Tasks.TestPaxos do + use Mix.Task + + def run(_) do + IO.puts("Testing Paxos") + + procs = Enum.map(0..3, fn i -> String.to_atom("p#{i}") end) + _pids = Enum.map(procs, fn p -> start(p, procs) end) + + IO.puts("spawned") + + Process.send_after(self(), {:end}, 50000) + + receive do + {:end} -> + IO.puts("test ended") + end + end + + def start(name, procs) do + pid = spawn(Mix.Tasks.TestPaxos, :init, [name, procs]) + + Utils.register_name(name, pid) + end + + def init(name, procs) do + IO.puts("#{name}: init") + Paxos.start(name, procs) + + Process.sleep(200) + + if name == :p0 do + Paxos.propose(name, "Hi I was decided") + end + + run_test(name) + end + + def run_test(name) do + receive do + {:decide, value} -> + IO.puts("#{name}: decide #{value}") + end + + run_test(name) + end +end diff --git a/lib/paxos.ex b/lib/paxos.ex new file mode 100644 index 0000000..b116b32 --- /dev/null +++ b/lib/paxos.ex @@ -0,0 +1,178 @@ +defmodule Paxos do + def getPaxosName(name) do + String.to_atom(Atom.to_string(name) <> "_paxos") + end + + def getOriginalName(name) do + String.to_atom(String.replace(Atom.to_string(name), "_paxos", "")) + end + + def start(name, processes) do + new_name = getPaxosName(name) + pid = spawn(Paxos, :init, [new_name, name, processes]) + + Utils.register_name(new_name, pid) + end + + # Init event must be the first + # one after the component is created + def init(name, parent, processes) do + processes = Enum.map(processes, fn name -> getPaxosName(name) end) + EventualLeaderElector.start(name, processes) + EagerReliableBroadcast.start(name, processes) + + state = %{ + name: name, + parent: parent, + processes: processes, + leader: nil, + value: nil, + other_value: nil, + ballot: 0, + ballot_value: nil, + prepared_values: [], + accepted: 0, + running_ballot: 0, + accepted_ballot: nil, + accepted_value: nil, + decided: false, + pid_to_inform: nil + } + + run(state) + end + + def run(state) do + run( + receive do + {:ele_trust, proc} -> + prepare(%{state | leader: proc}) + + {:propose, value, pid_to_inform} -> + if state.value == nil do + EagerReliableBroadcast.broadcast(state.name, {:other_propose, value}) + prepare(%{state | value: value, pid_to_inform: pid_to_inform}) + else + %{state | pid_to_inform: pid_to_inform} + end + + {:rb_deliver, _proc, {:other_propose, value}} -> + %{state | other_value: value} + + {:rb_deliver, _proc, {:prepare, proc, ballot}} -> + if ballot > state.running_ballot do + Utils.unicast({:prepared, ballot, state.accepted_ballot, state.accepted_value}, proc) + %{state | running_ballot: ballot} + else + Utils.unicast({:nack, ballot}, proc) + state + end + + {:nack, ballot} -> + if state.leader == state.name and state.ballot == ballot do + prepare(state) + else + state + end + + {:prepared, ballot, accepted_ballot, accepted_value} -> + if ballot == state.ballot do + prepared(%{ + state + | prepared_values: state.prepared_values ++ [{accepted_ballot, accepted_value}] + }) + else + state + end + + {:rb_deliver, proc, {:accept, ballot, value}} -> + if ballot >= state.running_ballot do + Utils.unicast({:accepted, ballot}, proc) + %{state | running_ballot: ballot, accepted_value: value, accepted_ballot: ballot} + else + Utils.unicast({:nack, ballot}, proc) + state + end + + {:accepted, ballot} -> + if state.leader == state.name and state.ballot == ballot do + accepted(%{state | accepted: state.accepted + 1}) + else + state + end + + {:rb_deliver, _, {:decide, value}} -> + if state.decided == true do + state + else + if state.pid_to_inform != nil do + send(state.pid_to_inform, {:decide, value}) + end + + %{state | decided: true} + end + end + ) + end + + # + # Puts process in the preapre state + # + + def prepare(state) when state.leader != state.name, do: state + def prepare(state) when state.value == nil and state.other_value == nil, do: state + + def prepare(state) do + ballot = state.ballot + 1 + EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, ballot}) + %{state | ballot: ballot, prepared_values: [], accepted: 0, ballot_value: nil} + end + + # + # Process the prepared responses + # + def prepared(state) when state.leader != state.name, do: state + + def prepared(state) when length(state.prepared_values) > length(state.processes) / 2 + 1 do + {_, a_val} = + Enum.reduce(state.prepared_values, {0, nil}, fn {bal, val}, {a_bal, a_val} -> + if a_bal > bal do + {a_bal, a_val} + else + {bal, val} + end + end) + + a_val = + if a_val == nil do + if state.value == nil do + state.other_value + else + state.value + end + else + a_val + end + + EagerReliableBroadcast.broadcast(state.name, {:accept, state.ballot, a_val}) + %{state | ballot_value: a_val} + end + + def prepared(state), do: state + + # + # Process the accepted responses + # + def accepted(state) when state.leader != state.name, do: state + + def accepted(state) when state.accepted > length(state.processes) / 2 + 1 do + EagerReliableBroadcast.broadcast(state.name, {:decide, state.ballot_value}) + state + end + + def accepted(state), do: state + + def propose(name, value) do + Utils.unicast({:propose, value}, getPaxosName(name)) + end +end diff --git a/lib/utils.ex b/lib/utils.ex new file mode 100644 index 0000000..4635a3e --- /dev/null +++ b/lib/utils.ex @@ -0,0 +1,24 @@ +defmodule Utils do + def unicast(m, p) do + case :global.whereis_name(p) do + pid when is_pid(pid) -> send(pid, m) + :undefined -> :ok + end + end + + def beb_broadcast(m, dest), do: for(p <- dest, do: unicast(m, p)) + + def register_name(name, pid) do + case :global.re_register_name(name, pid) do + :yes -> + # Note this is running on the parent so we are linking the parent to the rb + # so that when we close the parent the rb also dies + Process.link(pid) + pid + + :no -> + Process.exit(pid, :kill) + :error + end + end +end diff --git a/mix.exs b/mix.exs index e1db2db..8c225ae 100644 --- a/mix.exs +++ b/mix.exs @@ -14,6 +14,7 @@ defmodule DistributedSystemCoursework.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ + mod: {Runner, []}, extra_applications: [:logger] ] end