diff --git a/lib/eager_reliable_broadcast.ex b/lib/eager_reliable_broadcast.ex deleted file mode 100644 index 8933051..0000000 --- a/lib/eager_reliable_broadcast.ex +++ /dev/null @@ -1,63 +0,0 @@ -defmodule EagerReliableBroadcast do - def get_rb_name(name) do - String.to_atom(Atom.to_string(name) <> "_rb") - end - - def get_non_rb_name(name) do - String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) - end - - def start(name, processes) do - pid = spawn(EagerReliableBroadcast, :init, [name, processes]) - Utils.register_name(get_rb_name(name), pid) - end - - # Init event must be the first - # one after the component is created - def init(parent, processes) do - state = %{ - name: get_rb_name(parent), - parent: parent, - processes: Enum.map(processes, fn name -> get_rb_name(name) end), - # Use this data structure to remember IDs of the delivered messages - delivered: %{}, - # Use this variable to remember the last sequence number used to identify a message - seq_no: 0 - } - - run(state) - end - - def run(state) do - run( - receive do - # Handle the broadcast request event - {:broadcast, m} -> - data_msg = {:data, state.name, state.seq_no, m} - Utils.beb_broadcast(data_msg, state.processes) - %{state | seq_no: state.seq_no + 1} - - {:data, proc, seq_no, m} -> - if not Map.has_key?(state.delivered, {proc, seq_no, m}) do - data_msg = {:data, proc, seq_no, m} - Utils.beb_broadcast(data_msg, state.processes) - - Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) - %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} - else - val = Map.get(state.delivered, {proc, seq_no, m}) - - if val < Enum.count(state.processes) do - %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} - else - %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} - end - end - end - ) - end - - def broadcast(name, m) do - Utils.unicast({:broadcast, m}, get_rb_name(name)) - end -end diff --git a/lib/leader_elector.ex b/lib/leader_elector.ex deleted file mode 100644 index 3393854..0000000 --- a/lib/leader_elector.ex +++ /dev/null @@ -1,76 +0,0 @@ -# -# Emits {:ele_trust, proc } -# - -defmodule EventualLeaderElector do - def getEleName(name) do - String.to_atom(Atom.to_string(name) <> "_ele") - end - - def start(name, processes) do - new_name = getEleName(name) - pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) - - Utils.register_name(new_name, pid) - end - - # Init event must be the first - # one after the component is created - def init(name, parent, processes) do - processes = Enum.map(processes, fn name -> getEleName(name) end) - - state = %{ - name: name, - parent: parent, - processes: processes, - timeout: 1000, - heard_back: MapSet.new(), - seq: 0, - last_trust: nil - } - - run(request_heartbeats(state)) - end - - def request_heartbeats(state) do - state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} - Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) - - Process.send_after(self(), {:timeout}, state.timeout) - state - end - - def run(state) do - run( - receive do - {:heartbeat_request, name, seq} -> - Utils.unicast({:heartbeat, state.parent, seq}, name) - state - - {:heartbeat, name, seq} -> - if seq == state.seq do - %{state | heard_back: MapSet.put(state.heard_back, name)} - else - state - end - - {:timeout} -> - state = - if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do - state - else - to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) - - if state.last_trust != to_trust do - Utils.unicast({:ele_trust, to_trust}, state.parent) - %{state | last_trust: to_trust} - else - state - end - end - - request_heartbeats(state) - end - ) - end -end diff --git a/lib/paxos.ex b/lib/paxos.ex index 165f9f7..e0a8bbc 100644 --- a/lib/paxos.ex +++ b/lib/paxos.ex @@ -1,3 +1,56 @@ +defmodule Utils do + def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil') + def safecast(p, m) when is_pid(p), do: send(p, m) + def safecast(p, m) do + case :global.whereis_name(p) do + pid when is_pid(pid) -> send(pid, m) + :undefined -> :ok + end + end + + @deprecated + def unicast(m, p) do + case :global.whereis_name(p) do + pid when is_pid(pid) -> send(pid, m) + :undefined -> :ok + end + end + + def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m)) + + def register_name(name, pid, link \\ true) do + case :global.re_register_name(name, pid) do + :yes -> + # Note this is running on the parent so we are linking the parent to the rb + # so that when we close the parent the rb also dies + if link do + Process.link(pid) + end + + pid + + :no -> + Process.exit(pid, :kill) + :error + end + end + + defmacro checkinst(val, do: expr) do + quote do + case var!(state).instmap[var!(inst)] != nil do + unquote(val) -> unquote(expr) + _ -> var!(state) + end + end + end + + defmacro checkinst(do: expr) do + quote do + checkinst(true, expr) + end + end +end + # # # Possible actions @@ -5,15 +58,26 @@ # :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot # this is usefull when forcing a nack # - - defmodule Paxos do + require Utils + import Utils + + defmacro set_instmap(do: expr) do + quote do + var!(map) = var!(state).instmap[var!(inst)] + new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr)) + var!(state) = %{var!(state) | instmap: new_instmap } + end + end + + + # Starts the Paxos replica with a specific name and some processes def start(name, processes) do IO.puts("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, name, processes]) - Utils.register_name(name, pid, false) + register_name(name, pid, false) end # Init event must be the first @@ -34,10 +98,9 @@ defmodule Paxos do run(state) end + # Guarantees that a specific state exists def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do - if Map.has_key?(state.instmap, inst) do - state - else + checkinst false do instmap = Map.put(state.instmap, inst, %{ value: value, @@ -96,7 +159,9 @@ defmodule Paxos do # Inform the pid with timeout right way send(pid_to_inform, {:timeout, inst}); - set_instmap(state, inst, fn map -> %{map| ballot: Ballot.inc(map.ballot)} end) + set_instmap do + %{map| ballot: Ballot.inc(map.ballot)} + end not Map.has_key?(state.instmap, inst) -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) @@ -108,15 +173,15 @@ defmodule Paxos do EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) Process.send_after(self(), {:timeout, inst}, t) - prepare( - set_instmap(state, inst, fn map -> %{ - map - | value: value, + set_instmap do + %{ map | + value: value, pid_to_inform: pid_to_inform, action: action, - } end), - inst - ) + } + end + + prepare(state, inst) true -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) @@ -134,7 +199,9 @@ defmodule Paxos do true -> state = has_or_create(state, inst) - state = set_instmap(state, inst, fn map -> %{map | other_value: value} end) + set_instmap do + %{map | other_value: value} + end prepare(state, inst) end @@ -148,31 +215,28 @@ defmodule Paxos do not Map.has_key?(state.instmap, inst) -> state = has_or_create(state, inst) - Utils.unicast( - {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, - state.instmap[inst].accepted_value}, - proc - ) + safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value}); - set_instmap(state, inst, fn map -> %{ - map - | ballot: ballot - } end) + set_instmap do + %{ map + | ballot: ballot + } + end Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> - Utils.unicast( + safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, - state.instmap[inst].accepted_value}, - proc + state.instmap[inst].accepted_value} ) - set_instmap(state, inst, fn map -> %{ - map - | ballot: ballot - } end) + set_instmap do + %{ map + | ballot: ballot + } + end true -> - Utils.unicast({:nack, inst, ballot}, proc) + safecast(proc, {:nack, inst, ballot}) state end @@ -197,12 +261,13 @@ defmodule Paxos do EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot}) - set_instmap(state, inst, fn map -> %{ - map | has_sent_accept: false, - has_sent_prepare: false, - ballot: Ballot.inc(map.ballot), - aborted: true, - } end) + set_instmap do + %{ map | has_sent_accept: false, + has_sent_prepare: false, + ballot: Ballot.inc(map.ballot), + aborted: true, + } + end true -> state @@ -234,11 +299,12 @@ defmodule Paxos do state Ballot.compare(ballot, &==/2, state.instmap[inst].ballot) -> - state = - set_instmap(state, inst, fn map -> %{ + set_instmap do + %{ map | prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}] - } end) + } + end prepared(state, inst) @@ -262,17 +328,18 @@ defmodule Paxos do if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}") - Utils.unicast({:accepted, inst, ballot}, proc) + safecast(proc, {:accepted, inst, ballot}) - set_instmap(state, inst, fn map -> %{ - map - | ballot: ballot, - accepted_value: value, - accepted_ballot: ballot - } end) + set_instmap do + %{ map + | ballot: ballot, + accepted_value: value, + accepted_ballot: ballot + } + end else IO.puts("#{state.name} -> #{proc} nack") - Utils.unicast({:nack, inst, ballot}, proc) + safecast(proc, {:nack, inst, ballot}) state end end @@ -284,14 +351,14 @@ defmodule Paxos do has_finished(state, inst) -> state - state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) -> - accepted( - set_instmap(state, inst, fn map -> %{ - map - | accepted: map.accepted + 1 - } end), - inst - ) + state.leader == state.name and state.instmap[inst].ballot == ballot -> + set_instmap do + %{ map | + accepted: map.accepted + 1 + } + end + + accepted( state, inst) true -> state @@ -334,11 +401,6 @@ defmodule Paxos do ) end - def set_instmap(state, inst, set_instmap) do - new_instmap = Map.put(state.instmap, inst, set_instmap.(state.instmap[inst])) - %{state | instmap: new_instmap} - end - # # Puts process in the preapre state # @@ -364,15 +426,17 @@ defmodule Paxos do IO.puts("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}") EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) - set_instmap(state, inst, fn map -> %{ - map - | prepared_values: [], - accepted: 0, - aborted: false, - ballot_value: nil, - has_sent_prepare: true, - has_sent_accept: false - } end) + set_instmap do + %{ + map + | prepared_values: [], + accepted: 0, + aborted: false, + ballot_value: nil, + has_sent_prepare: true, + has_sent_accept: false + } + end end end @@ -413,11 +477,12 @@ defmodule Paxos do {:accept, inst, state.instmap[inst].ballot, a_val} ) - set_instmap(state, inst, fn map -> %{ - map - | ballot_value: a_val, - has_sent_accept: true - } end) + set_instmap do + %{ map | + ballot_value: a_val, + has_sent_accept: true + } + end else state end @@ -551,3 +616,144 @@ defmodule Ballot do end end + +defmodule EagerReliableBroadcast do + def get_rb_name(name) do + String.to_atom(Atom.to_string(name) <> "_rb") + end + + def get_non_rb_name(name) do + String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) + end + + def start(name, processes) do + pid = spawn(EagerReliableBroadcast, :init, [name, processes]) + Utils.register_name(get_rb_name(name), pid) + end + + # Init event must be the first + # one after the component is created + def init(parent, processes) do + state = %{ + name: get_rb_name(parent), + parent: parent, + processes: Enum.map(processes, fn name -> get_rb_name(name) end), + # Use this data structure to remember IDs of the delivered messages + delivered: %{}, + # Use this variable to remember the last sequence number used to identify a message + seq_no: 0 + } + + run(state) + end + + def run(state) do + run( + receive do + # Handle the broadcast request event + {:broadcast, m} -> + data_msg = {:data, state.name, state.seq_no, m} + Utils.beb_broadcast(data_msg, state.processes) + %{state | seq_no: state.seq_no + 1} + + {:data, proc, seq_no, m} -> + if not Map.has_key?(state.delivered, {proc, seq_no, m}) do + data_msg = {:data, proc, seq_no, m} + Utils.beb_broadcast(data_msg, state.processes) + + Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} + else + val = Map.get(state.delivered, {proc, seq_no, m}) + + if val < Enum.count(state.processes) do + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} + else + %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} + end + end + end + ) + end + + def broadcast(name, m) do + Utils.unicast({:broadcast, m}, get_rb_name(name)) + end +end + +# +# Emits {:ele_trust, proc } +# + +defmodule EventualLeaderElector do + def getEleName(name) do + String.to_atom(Atom.to_string(name) <> "_ele") + end + + def start(name, processes) do + new_name = getEleName(name) + pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) + + Utils.register_name(new_name, pid) + end + + # Init event must be the first + # one after the component is created + def init(name, parent, processes) do + processes = Enum.map(processes, fn name -> getEleName(name) end) + + state = %{ + name: name, + parent: parent, + processes: processes, + timeout: 1000, + heard_back: MapSet.new(), + seq: 0, + last_trust: nil + } + + run(request_heartbeats(state)) + end + + def request_heartbeats(state) do + state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} + Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) + + Process.send_after(self(), {:timeout}, state.timeout) + state + end + + def run(state) do + run( + receive do + {:heartbeat_request, name, seq} -> + Utils.unicast({:heartbeat, state.parent, seq}, name) + state + + {:heartbeat, name, seq} -> + if seq == state.seq do + %{state | heard_back: MapSet.put(state.heard_back, name)} + else + state + end + + {:timeout} -> + state = + if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do + state + else + to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) + + if state.last_trust != to_trust do + Utils.unicast({:ele_trust, to_trust}, state.parent) + %{state | last_trust: to_trust} + else + state + end + end + + request_heartbeats(state) + end + ) + end +end diff --git a/lib/test.ex b/lib/test.ex new file mode 100644 index 0000000..ba9678a --- /dev/null +++ b/lib/test.ex @@ -0,0 +1,17 @@ +defmodule Test do + + defmacro createfuncBase(name) do + quote do + def unquote(name)(true), do: true + end + end + +end + +defmodule Test2 do + require Test + + def test(), do: false + + Test.createfuncBase(:test) +end diff --git a/lib/test_script.exs b/lib/test_script.exs index 6ae32ae..df6add3 100755 --- a/lib/test_script.exs +++ b/lib/test_script.exs @@ -1,7 +1,4 @@ # Replace with your own implementation source files -IEx.Helpers.c("utils.ex", ".") -IEx.Helpers.c("eager_reliable_broadcast.ex", ".") -IEx.Helpers.c("leader_elector.ex", ".") IEx.Helpers.c("paxos.ex", ".") # Do not modify the following ########## diff --git a/lib/utils.ex b/lib/utils.ex deleted file mode 100644 index bb15cbd..0000000 --- a/lib/utils.ex +++ /dev/null @@ -1,27 +0,0 @@ -defmodule Utils do - def unicast(m, p) do - case :global.whereis_name(p) do - pid when is_pid(pid) -> send(pid, m) - :undefined -> :ok - end - end - - def beb_broadcast(m, dest), do: for(p <- dest, do: unicast(m, p)) - - def register_name(name, pid, link \\ true) do - case :global.re_register_name(name, pid) do - :yes -> - # Note this is running on the parent so we are linking the parent to the rb - # so that when we close the parent the rb also dies - if link do - Process.link(pid) - end - - pid - - :no -> - Process.exit(pid, :kill) - :error - end - end -end