Andre Henriques
249a6e1ae3
All checks were successful
continuous-integration/drone/push Build is passing
492 lines
14 KiB
Elixir
492 lines
14 KiB
Elixir
defmodule Paxos do
|
|
def start(name, processes) do
|
|
IO.puts("Starting paxos for #{name}")
|
|
|
|
pid = spawn(Paxos, :init, [name, name, processes])
|
|
|
|
Utils.register_name(name, pid, false)
|
|
end
|
|
|
|
# Init event must be the first
|
|
# one after the component is created
|
|
def init(name, parent, processes) do
|
|
EventualLeaderElector.start(name, processes)
|
|
EagerReliableBroadcast.start(name, processes)
|
|
|
|
state = %{
|
|
name: name,
|
|
parent: parent,
|
|
processes: processes,
|
|
leader: nil,
|
|
instmap: %{},
|
|
other_values: %{},
|
|
decided: %{}
|
|
}
|
|
|
|
run(state)
|
|
end
|
|
|
|
def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
|
|
if Map.has_key?(state.instmap, inst) do
|
|
state
|
|
else
|
|
instmap =
|
|
Map.put(state.instmap, inst, %{
|
|
value: value,
|
|
ballot: 0,
|
|
ballot_value: nil,
|
|
prepared_values: [],
|
|
accepted: 0,
|
|
accepted_ballot: nil,
|
|
accepted_value: nil,
|
|
pid_to_inform: pid_to_inform,
|
|
has_sent_accept: false,
|
|
action: action,
|
|
has_sent_prepare: false,
|
|
})
|
|
|
|
%{state | instmap: instmap}
|
|
end
|
|
end
|
|
|
|
def has_finished(state, inst) do
|
|
Map.has_key?(state.decided, inst)
|
|
end
|
|
|
|
def run(state) do
|
|
run(
|
|
receive do
|
|
{:ele_trust, proc} ->
|
|
IO.puts("#{state.name} - #{proc} is leader")
|
|
|
|
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
|
|
# IO.puts("#{state.name} - looping after leader: #{inst}")
|
|
prepare(st, inst)
|
|
end)
|
|
|
|
{:propose, inst, value, t, pid_to_inform, action} ->
|
|
IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
IO.puts("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}")
|
|
send(pid_to_inform, {:decision, inst, state.decided[inst]})
|
|
state
|
|
|
|
not Map.has_key?(state.instmap, inst) ->
|
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
|
state = has_or_create(state, inst, value, pid_to_inform, action)
|
|
Process.send_after(self(), {:timeout, inst}, t)
|
|
prepare(state, inst)
|
|
|
|
state.instmap[inst].value == nil ->
|
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
|
Process.send_after(self(), {:timeout, inst}, t)
|
|
|
|
prepare(
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| value: value,
|
|
pid_to_inform: pid_to_inform,
|
|
action: action,
|
|
} end),
|
|
inst
|
|
)
|
|
|
|
true ->
|
|
prepare(state, inst)
|
|
end
|
|
|
|
{:rb_deliver, proc, {:other_propose, inst, value}} ->
|
|
state = %{state | other_values: Map.put(state.other_values, inst, value)}
|
|
|
|
cond do
|
|
Map.has_key?(state.decided, inst) ->
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:decide, inst, state.decided[inst]}
|
|
)
|
|
state
|
|
|
|
true ->
|
|
state = has_or_create(state, inst)
|
|
prepare(state, inst)
|
|
end
|
|
|
|
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
|
|
IO.puts("#{state.name} - prepare from #{proc}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
not Map.has_key?(state.instmap, inst) ->
|
|
state = has_or_create(state, inst)
|
|
|
|
Utils.unicast(
|
|
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
|
|
state.instmap[inst].accepted_value},
|
|
proc
|
|
)
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| ballot: ballot
|
|
} end)
|
|
|
|
ballot > state.instmap[inst].ballot ->
|
|
Utils.unicast(
|
|
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
|
|
state.instmap[inst].accepted_value},
|
|
proc
|
|
)
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| ballot: ballot
|
|
} end)
|
|
|
|
true ->
|
|
Utils.unicast({:nack, inst, ballot, state.instmap[inst].ballot}, proc)
|
|
state
|
|
end
|
|
|
|
{:timeout, inst} ->
|
|
if not has_finished(state, inst) do
|
|
send(state.instmap[inst].pid_to_inform, {:timeout, inst})
|
|
end
|
|
|
|
state
|
|
|
|
{:nack, inst, ballot, new_ballot} ->
|
|
IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)} #{inspect(new_ballot)}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
state.leader == state.name and state.instmap[inst].ballot == ballot ->
|
|
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot, new_ballot})
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map | has_sent_accept: false,
|
|
ballot: new_ballot + 1,
|
|
has_sent_prepare: false,
|
|
} end)
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
state.instmap[inst].ballot == ballot ->
|
|
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
|
end
|
|
|
|
state
|
|
|
|
true ->
|
|
state
|
|
|
|
end
|
|
|
|
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
|
|
IO.puts(
|
|
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
|
|
)
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
ballot == state.instmap[inst].ballot ->
|
|
state =
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
|
|
} end)
|
|
|
|
prepared(state, inst)
|
|
|
|
ballot > state.instmap[inst].ballot ->
|
|
IO.puts("Probably recieved this before preare came to self sending with delay")
|
|
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
|
|
state
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
true ->
|
|
state = has_or_create(state, inst)
|
|
|
|
if ballot >= state.instmap[inst].ballot do
|
|
IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
|
|
|
|
Utils.unicast({:accepted, inst, ballot}, proc)
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| ballot: ballot,
|
|
accepted_value: value,
|
|
accepted_ballot: ballot
|
|
} end)
|
|
else
|
|
IO.puts("#{state.name} -> #{proc} nack")
|
|
Utils.unicast({:nack, inst, ballot, state.instmap[inst].ballot}, proc)
|
|
state
|
|
end
|
|
end
|
|
|
|
{:accepted, inst, ballot} ->
|
|
IO.puts("#{state.name} accepted #{inspect(inst)} #{inspect(ballot)}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
state.leader == state.name and state.instmap[inst].ballot == ballot ->
|
|
accepted(
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| accepted: map.accepted + 1
|
|
} end),
|
|
inst
|
|
)
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
{:get_value, inst, pid_to_inform, t} ->
|
|
# IO.puts("#{state.name} get_value")
|
|
|
|
cond do
|
|
t < 0 ->
|
|
send(pid_to_inform, {:get_value_res, inst})
|
|
|
|
has_finished(state, inst) ->
|
|
send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]})
|
|
|
|
true ->
|
|
Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500)
|
|
end
|
|
|
|
state
|
|
|
|
{:rb_deliver, _, {:decide, inst, value}} ->
|
|
IO.puts("#{state.name} decided #{inspect(inst)} #{inspect(value)}")
|
|
|
|
if has_finished(state, inst) do
|
|
state
|
|
else
|
|
if Map.has_key?(state.instmap, inst) != nil and
|
|
state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
|
|
end
|
|
|
|
%{
|
|
state
|
|
| decided: Map.put(state.decided, inst, value),
|
|
instmap: Map.delete(state.instmap, inst)
|
|
}
|
|
end
|
|
end
|
|
)
|
|
end
|
|
|
|
def set_instmap(state, inst, set_instmap) do
|
|
new_instmap = Map.put(state.instmap, inst, set_instmap.(state.instmap[inst]))
|
|
%{state | instmap: new_instmap}
|
|
end
|
|
|
|
#
|
|
# Puts process in the preapre state
|
|
#
|
|
|
|
def prepare(state, _) when state.leader != state.name, do: state
|
|
|
|
def prepare(state, inst) do
|
|
cond do
|
|
Map.get(state.instmap, inst) == nil and Map.get(state.other_values, inst) == nil ->
|
|
state
|
|
|
|
Map.get(state.instmap, inst) != nil and state.instmap[inst].has_sent_prepare ->
|
|
state
|
|
|
|
Map.get(state.instmap, inst) != nil and state.instmap[inst].has_sent_accept ->
|
|
state
|
|
|
|
true ->
|
|
ballot = state.instmap[inst].ballot + 1
|
|
IO.puts("#{state.name} sending all prepare #{inst} #{ballot}")
|
|
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| prepared_values: [],
|
|
accepted: 0,
|
|
ballot_value: nil,
|
|
has_sent_prepare: true,
|
|
has_sent_accept: false
|
|
} end)
|
|
end
|
|
end
|
|
|
|
#
|
|
# Process the prepared responses
|
|
#
|
|
def prepared(state, _) when state.leader != state.name, do: state
|
|
|
|
def prepared(state, inst) do
|
|
if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and
|
|
not state.instmap[inst].has_sent_accept do
|
|
{_, a_val} =
|
|
Enum.reduce(state.instmap[inst].prepared_values, {0, nil}, fn {bal, val},
|
|
{acc_bal, acc_val} ->
|
|
cond do
|
|
val == nil ->
|
|
{acc_bal, acc_val}
|
|
acc_bal > bal ->
|
|
{acc_bal, acc_val}
|
|
true ->
|
|
{bal, val}
|
|
end
|
|
end)
|
|
|
|
a_val =
|
|
if a_val == nil do
|
|
if state.instmap[inst].value == nil do
|
|
state.other_values[inst]
|
|
else
|
|
state.instmap[inst].value
|
|
end
|
|
else
|
|
a_val
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:accept, inst, state.instmap[inst].ballot, a_val}
|
|
)
|
|
|
|
set_instmap(state, inst, fn map -> %{
|
|
map
|
|
| ballot_value: a_val,
|
|
has_sent_accept: true
|
|
} end)
|
|
else
|
|
state
|
|
end
|
|
end
|
|
|
|
#
|
|
# Process the accepted responses
|
|
#
|
|
def accepted(state, _) when state.leader != state.name, do: state
|
|
|
|
def accepted(state, inst) do
|
|
if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do
|
|
value = state.instmap[inst].ballot_value
|
|
|
|
if state.instmap[inst].action == :kill_before_decision do
|
|
IO.puts("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}")
|
|
Process.exit(self(), :kill)
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:decide, inst, value}
|
|
)
|
|
|
|
if state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
|
|
end
|
|
|
|
%{
|
|
state
|
|
| decided: Map.put(state.decided, inst, value),
|
|
instmap: Map.delete(state.instmap, inst)
|
|
}
|
|
else
|
|
state
|
|
end
|
|
end
|
|
|
|
def propose(pid, inst, value, t, action \\ nil) do
|
|
send(pid, {:propose, inst, value, t, self(), action})
|
|
|
|
propose_loop(inst)
|
|
end
|
|
|
|
def propose_loop(inInst) do
|
|
receive do
|
|
{:timeout, inst} ->
|
|
if inInst == inst do
|
|
{:timeout}
|
|
else
|
|
propose_loop(inInst)
|
|
end
|
|
|
|
{:abort, inst} ->
|
|
if inInst == inst do
|
|
{:abort}
|
|
else
|
|
propose_loop(inInst)
|
|
end
|
|
|
|
{:decision, inst, d} ->
|
|
if inInst == inst do
|
|
{:decision, d}
|
|
else
|
|
propose_loop(inInst)
|
|
end
|
|
|
|
x ->
|
|
Process.send_after(self(), x, 500)
|
|
propose_loop(inInst)
|
|
end
|
|
end
|
|
|
|
def get_decision(pid, inst, t) do
|
|
send(pid, {:get_value, inst, self(), t})
|
|
get_decision_loop(inst)
|
|
end
|
|
|
|
def get_decision_loop(inInst) do
|
|
receive do
|
|
{:get_value_res, inst} ->
|
|
if inst == inInst do
|
|
nil
|
|
else
|
|
get_decision_loop(inInst)
|
|
end
|
|
|
|
{:get_value_res_actual, inst, v} ->
|
|
if inst == inInst do
|
|
v
|
|
else
|
|
get_decision_loop(inInst)
|
|
end
|
|
|
|
x ->
|
|
Process.send_after(self(), x, 500)
|
|
get_decision_loop(inInst)
|
|
end
|
|
end
|
|
end
|