Lot of changes
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-01-16 19:59:01 +00:00
parent bf9d0cd34a
commit 33281da43f
3 changed files with 496 additions and 306 deletions

View File

@@ -1,4 +1,8 @@
defmodule Utils do
@min_print_level 0
def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil')
def safecast(p, m) when is_pid(p), do: send(p, m)
def safecast(p, m) do
@@ -8,6 +12,10 @@ defmodule Utils do
end
end
def alter_name(name, part) do
String.to_atom(Atom.to_string(name) <> part)
end
def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m))
def register_name(name, pid, link \\ true) do
@@ -26,6 +34,21 @@ defmodule Utils do
end
end
defmacro create_log(level) do
quote do
def log(msg) do
Utils._log(msg, unquote(level))
end
end
end
def _log(msg, level) do
if (@min_print_level < level) do
IO.puts(msg)
end
end
defmacro or_state(val, do: expr) do
quote do
case unquote(val) do
@@ -34,6 +57,17 @@ defmodule Utils do
end
end
end
defmacro runfn(do: expr) do
quote do
def run(s) do
var!(state) = s
run(receive do
unquote(expr)
end)
end
end
end
end
#
@@ -47,6 +81,8 @@ defmodule Paxos do
require Utils
import Utils
create_log 0
defmacro set_instmap(do: expr) do
quote do
var!(map) = var!(state).instmap[var!(inst)]
@@ -57,22 +93,21 @@ defmodule Paxos do
# Starts the Paxos replica with a specific name and some processes
def start(name, processes) do
IO.puts("Starting paxos for #{name}")
def start(name, processes, link \\ false) do
log("Starting paxos for #{name}")
pid = spawn(Paxos, :init, [name, name, processes])
register_name(name, pid, false)
pid = spawn(Paxos, :init, [name, processes])
register_name(name, pid, link)
end
# Init event must be the first
# one after the component is created
def init(name, parent, processes) do
def init(name, processes) do
EventualLeaderElector.start(name, processes)
EagerReliableBroadcast.start(name, processes)
state = %{
name: name,
parent: parent,
processes: processes,
leader: nil,
instmap: %{},
@@ -115,246 +150,242 @@ defmodule Paxos do
end
end
def run(state) do
run(
receive do
{:ele_trust, proc} ->
IO.puts("#{state.name} - #{proc} is leader")
runfn do
{:ele_trust, proc} ->
log("#{state.name} - #{proc} is leader")
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
prepare(st, inst)
end)
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
prepare(st, inst)
end)
{:propose, inst, value, pid_to_inform, action} ->
IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
{:propose, inst, value, pid_to_inform, action} ->
log("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
cond do
has_finished(state, inst, true) ->
IO.puts("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}")
send(pid_to_inform, {:decision, inst, state.decided[inst]})
state
action == :increase_ballot_number ->
IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}")
state = has_or_create(state, inst)
set_instmap do
%{map| ballot: Ballot.inc(map.ballot)}
end
not Map.has_key?(state.instmap, inst) ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
state = has_or_create(state, inst, value, pid_to_inform, action)
prepare(state, inst)
state.instmap[inst].value == nil ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
set_instmap do
%{ map |
value: value,
pid_to_inform: pid_to_inform,
action: action,
}
end
prepare(state, inst)
true ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
prepare(state, inst)
end
{:rb_deliver, proc, {:other_propose, inst, value}} ->
cond do
has_finished(state, inst, true) ->
EagerReliableBroadcast.broadcast(
state.name,
{:decide, inst, state.decided[inst]}
)
state
true ->
state = has_or_create(state, inst)
set_instmap do
%{map | other_value: value}
end
prepare(state, inst)
end
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
IO.puts("#{state.name} - prepare from #{proc}")
cond do
has_finished(state, inst) ->
state
not Map.has_key?(state.instmap, inst) ->
state = has_or_create(state, inst)
safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value});
set_instmap do
%{ map
| ballot: ballot
}
end
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
safecast(proc,
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value}
)
set_instmap do
%{ map
| ballot: ballot
}
end
true ->
safecast(proc, {:nack, inst, ballot})
state
end
{:nack, inst, ballot} ->
IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
cond do
has_finished(state, inst) ->
state
state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) ->
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:abort, inst})
end
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot})
set_instmap do
%{ map | has_sent_accept: false,
has_sent_prepare: false,
ballot: Ballot.inc(map.ballot),
aborted: true,
}
end
true ->
state
end
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
cond do
has_finished(state, inst) ->
state
true ->
IO.puts("#{state.name} - got information to send abort")
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:abort, inst})
end
state
end
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
IO.puts(
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
)
cond do
has_finished(state, inst) ->
state
Ballot.compare(ballot, &==/2, state.instmap[inst].ballot) ->
set_instmap do
%{
map
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
}
end
prepared(state, inst)
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
IO.puts("#{state.name} - Probably recieved this before preare came to self sending with delay")
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
state
true ->
state
end
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
cond do
has_finished(state, inst) ->
state
true ->
state = has_or_create(state, inst)
if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do
IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
safecast(proc, {:accepted, inst, ballot})
set_instmap do
%{ map
| ballot: ballot,
accepted_value: value,
accepted_ballot: ballot
}
end
else
IO.puts("#{state.name} -> #{proc} nack")
safecast(proc, {:nack, inst, ballot})
state
end
end
{:accepted, inst, ballot} ->
IO.puts("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}")
cond do
has_finished(state, inst) ->
state
state.leader == state.name and state.instmap[inst].ballot == ballot ->
set_instmap do
%{ map |
accepted: map.accepted + 1
}
end
accepted( state, inst)
true ->
state
end
{:get_value, inst, pid_to_inform} ->
# IO.puts("#{state.name} get_value")
if has_finished(state, inst, true) do
send(pid_to_inform, {:get_value_res, inst, state.decided[inst]})
end
cond do
has_finished(state, inst, true) ->
log("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}")
send(pid_to_inform, {:decision, inst, state.decided[inst]})
state
{:rb_deliver, _, {:decide, inst, value}} ->
IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
action == :increase_ballot_number ->
log("#{state.name} - Got request to increase ballot number for inst #{inst}")
state = has_or_create(state, inst)
or_state not has_finished(state, inst) do
if Map.has_key?(state.instmap, inst) != nil and
state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
end
set_instmap do
%{map| ballot: Ballot.inc(map.ballot)}
end
%{ state |
decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst)
not Map.has_key?(state.instmap, inst) ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
state = has_or_create(state, inst, value, pid_to_inform, action)
prepare(state, inst)
state.instmap[inst].value == nil ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
set_instmap do
%{ map |
value: value,
pid_to_inform: pid_to_inform,
action: action,
}
end
prepare(state, inst)
true ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
prepare(state, inst)
end
{:rb_deliver, proc, {:other_propose, inst, value}} ->
cond do
has_finished(state, inst, true) ->
EagerReliableBroadcast.broadcast(
state.name,
{:decide, inst, state.decided[inst]}
)
state
true ->
state = has_or_create(state, inst)
set_instmap do
%{map | other_value: value}
end
prepare(state, inst)
end
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
log("#{state.name} - prepare from #{proc}")
cond do
has_finished(state, inst) ->
state
not Map.has_key?(state.instmap, inst) ->
state = has_or_create(state, inst)
safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value});
set_instmap do
%{ map
| ballot: ballot
}
end
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
safecast(proc,
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value}
)
set_instmap do
%{ map
| ballot: ballot
}
end
true ->
safecast(proc, {:nack, inst, ballot})
state
end
{:nack, inst, ballot} ->
log("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
cond do
has_finished(state, inst) ->
state
state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) ->
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:abort, inst})
end
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot})
set_instmap do
%{ map | has_sent_accept: false,
has_sent_prepare: false,
ballot: Ballot.inc(map.ballot),
aborted: true,
}
end
true ->
state
end
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
cond do
has_finished(state, inst) ->
state
true ->
log("#{state.name} - got information to send abort")
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:abort, inst})
end
state
end
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
log(
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
)
cond do
has_finished(state, inst) ->
state
Ballot.compare(ballot, &==/2, state.instmap[inst].ballot) ->
set_instmap do
%{
map
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
}
end
prepared(state, inst)
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
log("#{state.name} - Probably recieved this before preare came to self sending with delay")
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
state
true ->
state
end
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
cond do
has_finished(state, inst) ->
state
true ->
state = has_or_create(state, inst)
if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do
log("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
safecast(proc, {:accepted, inst, ballot})
set_instmap do
%{ map
| ballot: ballot,
accepted_value: value,
accepted_ballot: ballot
}
end
else
log("#{state.name} -> #{proc} nack")
safecast(proc, {:nack, inst, ballot})
state
end
end
{:accepted, inst, ballot} ->
log("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}")
cond do
has_finished(state, inst) ->
state
state.leader == state.name and state.instmap[inst].ballot == ballot ->
set_instmap do
%{ map |
accepted: map.accepted + 1
}
end
accepted( state, inst)
true ->
state
end
{:get_value, inst, pid_to_inform} ->
# log("#{state.name} get_value")
if has_finished(state, inst, true) do
safecast(pid_to_inform, {:get_value_res, inst, state.decided[inst]})
end
state
{:rb_deliver, _, {:decide, inst, value}} ->
log("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
or_state not has_finished(state, inst) do
if Map.has_key?(state.instmap, inst) != nil and
state.instmap[inst].pid_to_inform != nil do
safecast(state.instmap[inst].pid_to_inform, {:decision, inst, value})
end
%{ state |
decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst)
}
end
)
end
#
@@ -379,7 +410,7 @@ defmodule Paxos do
true ->
ballot = Ballot.inc(state.instmap[inst].ballot)
IO.puts("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}")
log("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}")
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
set_instmap do
@@ -453,7 +484,7 @@ defmodule Paxos do
value = state.instmap[inst].ballot_value
if state.instmap[inst].action == :kill_before_decision do
IO.puts("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}")
log("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}")
Process.exit(self(), :kill)
end
@@ -510,8 +541,8 @@ defmodule Paxos do
def get_decision_loop(input) do
{_, t} = input
receive do
{:get_value_res, inst} ->
check_and_apply(inst, inst, input, &get_decision_loop/1)
{:get_value_res, inst, v} ->
check_and_apply(v, inst, input, &get_decision_loop/1)
x ->
Process.send_after(self(), x, 500)
@@ -568,26 +599,22 @@ defmodule EagerReliableBroadcast do
require Utils
import Utils
def get_rb_name(name) do
String.to_atom(Atom.to_string(name) <> "_rb")
end
def get_non_rb_name(name) do
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
end
def start(name, processes) do
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
register_name(get_rb_name(name), pid)
register_name(alter_name(name, "_rb"), pid)
end
# Init event must be the first
# one after the component is created
def init(parent, processes) do
state = %{
name: get_rb_name(parent),
name: alter_name(parent, "_rb"),
parent: parent,
processes: Enum.map(processes, fn name -> get_rb_name(name) end),
processes: Enum.map(processes, fn name -> alter_name(name, "_rb") end),
# Use this data structure to remember IDs of the delivered messages
delivered: %{},
# Use this variable to remember the last sequence number used to identify a message
@@ -597,39 +624,35 @@ defmodule EagerReliableBroadcast do
run(state)
end
def run(state) do
run(
receive do
# Handle the broadcast request event
{:broadcast, m} ->
data_msg = {:data, state.name, state.seq_no, m}
beb_broadcast(data_msg, state.processes)
%{state | seq_no: state.seq_no + 1}
runfn do
# Handle the broadcast request event
{:broadcast, m} ->
data_msg = {:data, state.name, state.seq_no, m}
beb_broadcast(data_msg, state.processes)
%{state | seq_no: state.seq_no + 1}
{:data, proc, seq_no, m} ->
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
data_msg = {:data, proc, seq_no, m}
beb_broadcast(data_msg, state.processes)
{:data, proc, seq_no, m} ->
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
data_msg = {:data, proc, seq_no, m}
beb_broadcast(data_msg, state.processes)
safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m})
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
else
val = Map.get(state.delivered, {proc, seq_no, m})
safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m})
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
else
val = Map.get(state.delivered, {proc, seq_no, m})
if val < Enum.count(state.processes) do
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
else
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
end
end
if val < Enum.count(state.processes) do
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
else
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
end
end
)
end
#############
# Interface #
#############
def broadcast(name, m), do: safecast(get_rb_name(name), {:broadcast, m})
def broadcast(name, m), do: safecast(alter_name(name, "_rb"), {:broadcast, m})
end
#
@@ -640,12 +663,8 @@ defmodule EventualLeaderElector do
require Utils
import Utils
def getEleName(name) do
String.to_atom(Atom.to_string(name) <> "_ele")
end
def start(name, processes) do
new_name = getEleName(name)
new_name = alter_name(name, "_ele")
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
register_name(new_name, pid)
@@ -654,7 +673,7 @@ defmodule EventualLeaderElector do
# Init event must be the first
# one after the component is created
def init(name, parent, processes) do
processes = Enum.map(processes, fn name -> getEleName(name) end)
processes = Enum.map(processes, fn name -> alter_name(name, "_ele") end)
state = %{
name: name,
@@ -677,30 +696,26 @@ defmodule EventualLeaderElector do
state
end
def run(state) do
run(
receive do
{:heartbeat_request, name, seq} ->
safecast(name, {:heartbeat, state.parent, seq})
state
runfn do
{:heartbeat_request, name, seq} ->
safecast(name, {:heartbeat, state.parent, seq})
state
{:heartbeat, name, seq} ->
or_state seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)}
end
{:timeout} ->
state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
or_state state.last_trust != to_trust do
safecast(state.parent, {:ele_trust, to_trust})
%{state | last_trust: to_trust}
end
end
request_heartbeats(state)
{:heartbeat, name, seq} ->
or_state seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)}
end
)
{:timeout} ->
state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
or_state state.last_trust != to_trust do
safecast(state.parent, {:ele_trust, to_trust})
%{state | last_trust: to_trust}
end
end
request_heartbeats(state)
end
end