improved code
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Andre Henriques 2024-01-16 15:05:14 +00:00
parent 6bdb56755d
commit 0dc9215672
3 changed files with 64 additions and 115 deletions

View File

@ -8,14 +8,6 @@ defmodule Utils do
end end
end end
@deprecated
def unicast(m, p) do
case :global.whereis_name(p) do
pid when is_pid(pid) -> send(pid, m)
:undefined -> :ok
end
end
def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m)) def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m))
def register_name(name, pid, link \\ true) do def register_name(name, pid, link \\ true) do
@ -26,7 +18,6 @@ defmodule Utils do
if link do if link do
Process.link(pid) Process.link(pid)
end end
pid pid
:no -> :no ->
@ -35,20 +26,14 @@ defmodule Utils do
end end
end end
defmacro checkinst(val, do: expr) do defmacro or_state(val, do: expr) do
quote do quote do
case var!(state).instmap[var!(inst)] != nil do case unquote(val) do
unquote(val) -> unquote(expr) true -> unquote(expr)
_ -> var!(state) _ -> var!(state)
end end
end end
end end
defmacro checkinst(do: expr) do
quote do
checkinst(true, expr)
end
end
end end
# #
@ -76,7 +61,6 @@ defmodule Paxos do
IO.puts("Starting paxos for #{name}") IO.puts("Starting paxos for #{name}")
pid = spawn(Paxos, :init, [name, name, processes]) pid = spawn(Paxos, :init, [name, name, processes])
register_name(name, pid, false) register_name(name, pid, false)
end end
@ -100,7 +84,7 @@ defmodule Paxos do
# Guarantees that a specific state exists # Guarantees that a specific state exists
def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
checkinst false do or_state state.instmap[inst] == nil do
instmap = instmap =
Map.put(state.instmap, inst, %{ Map.put(state.instmap, inst, %{
value: value, value: value,
@ -138,11 +122,10 @@ defmodule Paxos do
IO.puts("#{state.name} - #{proc} is leader") IO.puts("#{state.name} - #{proc} is leader")
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
# IO.puts("#{state.name} - looping after leader: #{inst}")
prepare(st, inst) prepare(st, inst)
end) end)
{:propose, inst, value, t, pid_to_inform, action} -> {:propose, inst, value, pid_to_inform, action} ->
IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}") IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
cond do cond do
@ -152,12 +135,8 @@ defmodule Paxos do
state state
action == :increase_ballot_number -> action == :increase_ballot_number ->
state = has_or_create(state, inst)
IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}") IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}")
state = has_or_create(state, inst)
# Inform the pid with timeout right way
send(pid_to_inform, {:timeout, inst});
set_instmap do set_instmap do
%{map| ballot: Ballot.inc(map.ballot)} %{map| ballot: Ballot.inc(map.ballot)}
@ -166,12 +145,10 @@ defmodule Paxos do
not Map.has_key?(state.instmap, inst) -> not Map.has_key?(state.instmap, inst) ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
state = has_or_create(state, inst, value, pid_to_inform, action) state = has_or_create(state, inst, value, pid_to_inform, action)
Process.send_after(self(), {:timeout, inst}, t)
prepare(state, inst) prepare(state, inst)
state.instmap[inst].value == nil -> state.instmap[inst].value == nil ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
Process.send_after(self(), {:timeout, inst}, t)
set_instmap do set_instmap do
%{ map | %{ map |
@ -240,13 +217,6 @@ defmodule Paxos do
state state
end end
{:timeout, inst} ->
if not has_finished(state, inst) do
send(state.instmap[inst].pid_to_inform, {:timeout, inst})
end
state
{:nack, inst, ballot} -> {:nack, inst, ballot} ->
IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}") IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
@ -286,7 +256,6 @@ defmodule Paxos do
end end
state state
end end
{:prepared, inst, ballot, accepted_ballot, accepted_value} -> {:prepared, inst, ballot, accepted_ballot, accepted_value} ->
@ -305,7 +274,6 @@ defmodule Paxos do
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}] | prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
} }
end end
prepared(state, inst) prepared(state, inst)
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
@ -364,36 +332,24 @@ defmodule Paxos do
state state
end end
{:get_value, inst, pid_to_inform, t} -> {:get_value, inst, pid_to_inform} ->
# IO.puts("#{state.name} get_value") # IO.puts("#{state.name} get_value")
if has_finished(state, inst, true) do
cond do
t < 0 ->
send(pid_to_inform, {:get_value_res, inst})
has_finished(state, inst, true) ->
send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]}) send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]})
true ->
Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500)
end end
state state
{:rb_deliver, _, {:decide, inst, value}} -> {:rb_deliver, _, {:decide, inst, value}} ->
IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}") IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
if has_finished(state, inst) do or_state not has_finished(state, inst) do
state
else
if Map.has_key?(state.instmap, inst) != nil and if Map.has_key?(state.instmap, inst) != nil and
state.instmap[inst].pid_to_inform != nil do state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
end end
%{ %{ state |
state decided: Map.put(state.decided, inst, value),
| decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst) instmap: Map.delete(state.instmap, inst)
} }
end end
@ -427,9 +383,8 @@ defmodule Paxos do
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
set_instmap do set_instmap do
%{ %{ map |
map prepared_values: [],
| prepared_values: [],
accepted: 0, accepted: 0,
aborted: false, aborted: false,
ballot_value: nil, ballot_value: nil,
@ -522,77 +477,75 @@ defmodule Paxos do
end end
def propose(pid, inst, value, t, action \\ nil) do def propose(pid, inst, value, t, action \\ nil) do
send(pid, {:propose, inst, value, t, self(), action}) send(pid, {:propose, inst, value, self(), action})
propose_loop(inst) propose_loop({inst, t})
end end
def propose_loop(inInst) do def propose_loop(input) do
{_, t} = input
receive do receive do
{:timeout, inst} -> {:timeout, inst} ->
check_and_apply({:timeout}, inst, inInst, &propose_loop/1) check_and_apply({:timeout}, inst, input, &propose_loop/1)
{:abort, inst} -> {:abort, inst} ->
check_and_apply({:abort}, inst, inInst, &propose_loop/1) check_and_apply({:abort}, inst, input, &propose_loop/1)
{:decision, inst, d} -> {:decision, inst, d} ->
check_and_apply({:decision, d}, inst, inInst, &propose_loop/1) check_and_apply({:decision, d}, inst, input, &propose_loop/1)
x -> x ->
Process.send_after(self(), x, 500) Process.send_after(self(), x, 500)
propose_loop(inInst) propose_loop(input)
after
t -> {:timeout}
end end
end end
def get_decision(pid, inst, t) do def get_decision(pid, inst, t) do
send(pid, {:get_value, inst, self(), t}) send(pid, {:get_value, inst, self()})
get_decision_loop(inst) get_decision_loop({inst, t})
end end
def get_decision_loop(inInst) do def get_decision_loop(input) do
{_, t} = input
receive do receive do
{:get_value_res, inst} -> {:get_value_res, inst} ->
check_and_apply(nil, inst, inInst, &get_decision_loop/1) check_and_apply(nil, inst, input, &get_decision_loop/1)
{:get_value_res_actual, inst, v} -> {:get_value_res_actual, inst, v} ->
check_and_apply(v, inst, inInst, &get_decision_loop/1) check_and_apply(v, inst, input, &get_decision_loop/1)
x -> x ->
Process.send_after(self(), x, 500) Process.send_after(self(), x, 500)
get_decision_loop(inInst) get_decision_loop(input)
after
t -> nil
end end
end end
def check_and_apply(v, inst, inInst, fun) do def check_and_apply(v, inst, input, fun) do
{inInst, _} = input
if inst == inInst do if inst == inInst do
v v
else else
fun.(inInst) fun.(input)
end end
end end
end end
defmodule Ballot do defmodule Ballot do
def init(name, number \\ 0), do: {name, number}
def init(name, number \\ 0) do
{name, number}
end
def inc(b, name \\ nil) do def inc(b, name \\ nil) do
{old_name, number} = b {old_name, number} = b
{ {
if name == nil do if name == nil do old_name else name end,
old_name
else
name
end,
number + 1 number + 1
} }
end end
defp lexicographical_compare(a, b) do defp lexicographical_compare(a, b) do
cond do cond do
a == b -> 0 a == b -> 0
@ -611,13 +564,13 @@ defmodule Ballot do
end end
end end
def compare(b1, operator, b2) do def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0)
operator.(diff(b1, b2), 0)
end
end end
defmodule EagerReliableBroadcast do defmodule EagerReliableBroadcast do
require Utils
import Utils
def get_rb_name(name) do def get_rb_name(name) do
String.to_atom(Atom.to_string(name) <> "_rb") String.to_atom(Atom.to_string(name) <> "_rb")
end end
@ -628,7 +581,7 @@ defmodule EagerReliableBroadcast do
def start(name, processes) do def start(name, processes) do
pid = spawn(EagerReliableBroadcast, :init, [name, processes]) pid = spawn(EagerReliableBroadcast, :init, [name, processes])
Utils.register_name(get_rb_name(name), pid) register_name(get_rb_name(name), pid)
end end
# Init event must be the first # Init event must be the first
@ -653,15 +606,15 @@ defmodule EagerReliableBroadcast do
# Handle the broadcast request event # Handle the broadcast request event
{:broadcast, m} -> {:broadcast, m} ->
data_msg = {:data, state.name, state.seq_no, m} data_msg = {:data, state.name, state.seq_no, m}
Utils.beb_broadcast(data_msg, state.processes) beb_broadcast(data_msg, state.processes)
%{state | seq_no: state.seq_no + 1} %{state | seq_no: state.seq_no + 1}
{:data, proc, seq_no, m} -> {:data, proc, seq_no, m} ->
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
data_msg = {:data, proc, seq_no, m} data_msg = {:data, proc, seq_no, m}
Utils.beb_broadcast(data_msg, state.processes) beb_broadcast(data_msg, state.processes)
Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent) safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m})
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
else else
val = Map.get(state.delivered, {proc, seq_no, m}) val = Map.get(state.delivered, {proc, seq_no, m})
@ -676,9 +629,10 @@ defmodule EagerReliableBroadcast do
) )
end end
def broadcast(name, m) do #############
Utils.unicast({:broadcast, m}, get_rb_name(name)) # Interface #
end #############
def broadcast(name, m), do: safecast(get_rb_name(name), {:broadcast, m})
end end
# #
@ -686,6 +640,9 @@ end
# #
defmodule EventualLeaderElector do defmodule EventualLeaderElector do
require Utils
import Utils
def getEleName(name) do def getEleName(name) do
String.to_atom(Atom.to_string(name) <> "_ele") String.to_atom(Atom.to_string(name) <> "_ele")
end end
@ -694,7 +651,7 @@ defmodule EventualLeaderElector do
new_name = getEleName(name) new_name = getEleName(name)
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
Utils.register_name(new_name, pid) register_name(new_name, pid)
end end
# Init event must be the first # Init event must be the first
@ -717,7 +674,7 @@ defmodule EventualLeaderElector do
def request_heartbeats(state) do def request_heartbeats(state) do
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
Process.send_after(self(), {:timeout}, state.timeout) Process.send_after(self(), {:timeout}, state.timeout)
state state
@ -727,28 +684,21 @@ defmodule EventualLeaderElector do
run( run(
receive do receive do
{:heartbeat_request, name, seq} -> {:heartbeat_request, name, seq} ->
Utils.unicast({:heartbeat, state.parent, seq}, name) safecast(name, {:heartbeat, state.parent, seq})
state state
{:heartbeat, name, seq} -> {:heartbeat, name, seq} ->
if seq == state.seq do or_state seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)} %{state | heard_back: MapSet.put(state.heard_back, name)}
else
state
end end
{:timeout} -> {:timeout} ->
state = state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do
if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do
state
else
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
if state.last_trust != to_trust do or_state state.last_trust != to_trust do
Utils.unicast({:ele_trust, to_trust}, state.parent) safecast(state.parent, {:ele_trust, to_trust})
%{state | last_trust: to_trust} %{state | last_trust: to_trust}
else
state
end end
end end

View File

@ -125,7 +125,7 @@ defmodule PaxosTestAditional do
IO.puts("#{inspect(name)}: started") IO.puts("#{inspect(name)}: started")
[leader | spare] = Enum.sort(participants) [leader | spare] = Enum.sort(participants)
increase_ballot = Enum.take(spare, floor(length(participants) / 2) + 1) increase_ballot = Enum.take(spare, floor(length(participants) / 2))
if name == leader do if name == leader do
# Propose when passed with :kill_before_decision will die right before a decision is selected # Propose when passed with :kill_before_decision will die right before a decision is selected
@ -199,7 +199,7 @@ defmodule PaxosTestAditional do
participants = Enum.sort(participants) participants = Enum.sort(participants)
[leader | spare ] = participants [leader | spare ] = participants
increase_ballot = Enum.take(spare, floor(length(participants) / 2) + 1) increase_ballot = Enum.take(spare, floor(length(participants) / 2))
[non_leader | _] = Enum.reverse(spare) [non_leader | _] = Enum.reverse(spare)
if name == non_leader do if name == non_leader do

View File

@ -76,7 +76,6 @@ test_suite = [
{&PaxosTestAditional.run_non_leader_send_propose_after_leader_elected/3, TestUtil.get_local_config(5), 10, {&PaxosTestAditional.run_non_leader_send_propose_after_leader_elected/3, TestUtil.get_local_config(5), 10,
"Non-Leader proposes after leader is elected, 5 local procs"}, "Non-Leader proposes after leader is elected, 5 local procs"},
{&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_dist_config(host, 5), 10, {&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_dist_config(host, 5), 10,
"Leader should nack before decision and then come to decision, no concurrent ballots, 5 nodes"}, "Leader should nack before decision and then come to decision, no concurrent ballots, 5 nodes"},
{&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_local_config(5), 10, {&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_local_config(5), 10,