This repository has been archived on 2024-01-29. You can view files and clone it, but cannot push or open issues or pull requests.
distributed_system_coursework/lib/paxos.ex

462 lines
13 KiB
Elixir
Raw Normal View History

2023-11-28 21:42:11 +00:00
defmodule Paxos do
def start(name, processes) do
2024-01-02 21:39:00 +00:00
IO.puts("Starting paxos for #{name}")
2023-11-28 21:42:11 +00:00
2024-01-02 21:39:00 +00:00
pid = spawn(Paxos, :init, [name, name, processes])
Utils.register_name(name, pid, false)
2023-11-28 21:42:11 +00:00
end
# Init event must be the first
# one after the component is created
def init(name, parent, processes) do
EventualLeaderElector.start(name, processes)
EagerReliableBroadcast.start(name, processes)
state = %{
name: name,
parent: parent,
processes: processes,
leader: nil,
2024-01-02 21:39:00 +00:00
instmap: %{},
other_values: %{},
decided: %{}
2023-11-28 21:42:11 +00:00
}
run(state)
end
2024-01-07 15:02:11 +00:00
def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
if Map.has_key?(state.instmap, inst) do
state
else
2024-01-07 15:02:11 +00:00
instmap =
Map.put(state.instmap, inst, %{
value: value,
ballot: 0,
ballot_value: nil,
prepared_values: [],
accepted: 0,
accepted_ballot: nil,
accepted_value: nil,
pid_to_inform: pid_to_inform,
has_sent_accept: false,
action: action,
})
%{state | instmap: instmap}
end
end
2024-01-02 21:39:00 +00:00
def has_finished(state, inst) do
Map.has_key?(state.decided, inst)
2024-01-02 21:39:00 +00:00
end
2023-11-28 21:42:11 +00:00
def run(state) do
run(
receive do
{:ele_trust, proc} ->
2024-01-02 21:39:00 +00:00
IO.puts("#{state.name} - #{proc} is leader")
2023-11-28 21:42:11 +00:00
2024-01-02 21:39:00 +00:00
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
# IO.puts("#{state.name} - looping after leader: #{inst}")
prepare(st, inst)
end)
{:propose, inst, value, t, pid_to_inform, action} ->
IO.puts("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
2024-01-02 21:39:00 +00:00
cond do
has_finished(state, inst) ->
IO.puts("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}")
send(pid_to_inform, {:decision, inst, state.decided[inst]})
2024-01-02 21:39:00 +00:00
state
not Map.has_key?(state.instmap, inst) ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
2024-01-07 15:02:11 +00:00
state = has_or_create(state, inst, value, pid_to_inform, action)
2024-01-02 21:39:00 +00:00
Process.send_after(self(), {:timeout, inst}, t)
prepare(state, inst)
state.instmap[inst].value == nil ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
2024-01-02 21:39:00 +00:00
Process.send_after(self(), {:timeout, inst}, t)
prepare(
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
2024-01-02 21:39:00 +00:00
| value: value,
pid_to_inform: pid_to_inform,
action: action,
2024-01-07 15:02:11 +00:00
} end),
2024-01-02 21:39:00 +00:00
inst
)
true ->
prepare(state, inst)
2023-11-28 21:42:11 +00:00
end
{:rb_deliver, proc, {:other_propose, inst, value}} ->
state = %{state | other_values: Map.put(state.other_values, inst, value)}
cond do
Map.has_key?(state.decided, inst) ->
EagerReliableBroadcast.broadcast(
state.name,
{:decide, inst, state.decided[inst]}
)
2024-01-02 21:39:00 +00:00
state
2023-11-28 21:42:11 +00:00
true ->
state = has_or_create(state, inst)
prepare(state, inst)
end
2024-01-02 21:39:00 +00:00
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
IO.puts("#{state.name} - prepare from #{proc}")
2024-01-02 21:39:00 +00:00
cond do
has_finished(state, inst) ->
state
not Map.has_key?(state.instmap, inst) ->
state = has_or_create(state, inst)
Utils.unicast(
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value},
proc
)
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| ballot: ballot
2024-01-07 15:02:11 +00:00
} end)
2024-01-02 21:39:00 +00:00
ballot > state.instmap[inst].ballot ->
2024-01-02 21:39:00 +00:00
Utils.unicast(
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value},
proc
)
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| ballot: ballot
2024-01-07 15:02:11 +00:00
} end)
2024-01-02 21:39:00 +00:00
true ->
Utils.unicast({:nack, inst, ballot}, proc)
state
2023-11-28 21:42:11 +00:00
end
2024-01-02 21:39:00 +00:00
{:timeout, inst} ->
if not has_finished(state, inst) do
send(state.instmap[inst].pid_to_inform, {:timeout, inst})
2023-11-28 21:42:11 +00:00
end
state
2024-01-02 21:39:00 +00:00
{:nack, inst, ballot} ->
IO.puts("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
2024-01-02 21:39:00 +00:00
cond do
has_finished(state, inst) ->
state
2024-01-02 21:39:00 +00:00
state.leader == state.name and state.instmap[inst].ballot == ballot ->
2024-01-02 21:39:00 +00:00
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:abort, inst})
end
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map | has_sent_accept: false
} end)
true ->
2024-01-02 21:39:00 +00:00
state
end
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
IO.puts(
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
)
2024-01-02 21:39:00 +00:00
cond do
has_finished(state, inst) ->
state
ballot == state.instmap[inst].ballot ->
2024-01-02 21:39:00 +00:00
state =
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
} end)
2024-01-02 21:39:00 +00:00
prepared(state, inst)
ballot > state.instmap[inst].ballot ->
IO.puts("Probably recieved this before preare came to self sending with delay")
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
state
true ->
2024-01-02 21:39:00 +00:00
state
2023-11-28 21:42:11 +00:00
end
2024-01-02 21:39:00 +00:00
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
cond do
has_finished(state, inst) ->
state
2024-01-02 21:39:00 +00:00
true ->
state = has_or_create(state, inst)
2024-01-02 21:39:00 +00:00
if ballot >= state.instmap[inst].ballot do
IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
Utils.unicast({:accepted, inst, ballot}, proc)
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| ballot: ballot,
accepted_value: value,
accepted_ballot: ballot
2024-01-07 15:02:11 +00:00
} end)
else
IO.puts("#{state.name} -> #{proc} nack")
Utils.unicast({:nack, inst, ballot}, proc)
state
end
2024-01-02 21:39:00 +00:00
end
{:accepted, inst, ballot} ->
IO.puts("#{state.name} accepted #{inspect(inst)} #{inspect(ballot)}")
2024-01-02 21:39:00 +00:00
if has_finished(state, inst) do
2023-11-28 21:42:11 +00:00
state
2024-01-02 21:39:00 +00:00
else
if state.leader == state.name and state.instmap[inst].ballot == ballot do
accepted(
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| accepted: map.accepted + 1
} end),
2024-01-02 21:39:00 +00:00
inst
)
else
state
end
2023-11-28 21:42:11 +00:00
end
2024-01-02 21:39:00 +00:00
{:get_value, inst, pid_to_inform, t} ->
# IO.puts("#{state.name} get_value")
cond do
t < 0 ->
send(pid_to_inform, {:get_value_res, inst})
has_finished(state, inst) ->
send(pid_to_inform, {:get_value_res_actual, inst, state.decided[inst]})
2024-01-02 21:39:00 +00:00
true ->
Process.send_after(self(), {:get_value, inst, pid_to_inform, t - 500}, 500)
end
state
{:rb_deliver, _, {:decide, inst, value}} ->
IO.puts("#{state.name} decided #{inspect(inst)} #{inspect(value)}")
2024-01-02 21:39:00 +00:00
if has_finished(state, inst) do
2023-11-28 21:42:11 +00:00
state
else
2024-01-02 21:39:00 +00:00
if Map.has_key?(state.instmap, inst) != nil and
state.instmap[inst].pid_to_inform != nil do
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
2023-11-28 21:42:11 +00:00
end
2024-01-02 21:39:00 +00:00
%{
state
| decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst)
}
2023-11-28 21:42:11 +00:00
end
end
)
end
2024-01-07 15:02:11 +00:00
def set_instmap(state, inst, set_instmap) do
new_instmap = Map.put(state.instmap, inst, set_instmap.(state.instmap[inst]))
2024-01-02 21:39:00 +00:00
%{state | instmap: new_instmap}
end
2023-11-28 21:42:11 +00:00
#
# Puts process in the preapre state
#
2024-01-02 21:39:00 +00:00
def prepare(state, _) when state.leader != state.name, do: state
2023-11-28 21:42:11 +00:00
2024-01-02 21:39:00 +00:00
def prepare(state, inst) do
cond do
Map.get(state.instmap, inst) == nil and Map.get(state.other_values, inst) == nil ->
state
2024-01-02 21:39:00 +00:00
Map.get(state.instmap, inst) != nil and state.instmap[inst].has_sent_accept ->
state
true ->
ballot = state.instmap[inst].ballot + 1
IO.puts("#{state.name} sending all prepare #{inst} #{ballot}")
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| prepared_values: [],
accepted: 0,
ballot_value: nil,
has_sent_accept: false
2024-01-07 15:02:11 +00:00
} end)
2024-01-02 21:39:00 +00:00
end
2023-11-28 21:42:11 +00:00
end
#
# Process the prepared responses
#
2024-01-02 21:39:00 +00:00
def prepared(state, _) when state.leader != state.name, do: state
2023-11-28 21:42:11 +00:00
2024-01-02 21:39:00 +00:00
def prepared(state, inst) do
if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and
not state.instmap[inst].has_sent_accept do
2024-01-02 21:39:00 +00:00
{_, a_val} =
Enum.reduce(state.instmap[inst].prepared_values, {0, nil}, fn {bal, val},
{a_bal, a_val} ->
if a_bal > bal do
{a_bal, a_val}
else
{bal, val}
end
end)
2023-11-28 21:42:11 +00:00
2024-01-02 21:39:00 +00:00
a_val =
if a_val == nil do
if state.instmap[inst].value == nil do
state.other_values[inst]
else
state.instmap[inst].value
end
2023-11-28 21:42:11 +00:00
else
2024-01-02 21:39:00 +00:00
a_val
2023-11-28 21:42:11 +00:00
end
2024-01-02 21:39:00 +00:00
EagerReliableBroadcast.broadcast(
state.name,
{:accept, inst, state.instmap[inst].ballot, a_val}
)
2023-11-28 21:42:11 +00:00
2024-01-07 15:02:11 +00:00
set_instmap(state, inst, fn map -> %{
map
| ballot_value: a_val,
has_sent_accept: true
2024-01-07 15:02:11 +00:00
} end)
2024-01-02 21:39:00 +00:00
else
state
end
end
2023-11-28 21:42:11 +00:00
#
# Process the accepted responses
#
2024-01-02 21:39:00 +00:00
def accepted(state, _) when state.leader != state.name, do: state
def accepted(state, inst) do
if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do
value = state.instmap[inst].ballot_value
2023-11-28 21:42:11 +00:00
if state.instmap[inst].action == :kill_before_decision do
IO.puts("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}")
Process.exit(self(), :kill)
end
2024-01-02 21:39:00 +00:00
EagerReliableBroadcast.broadcast(
state.name,
{:decide, inst, value}
)
if state.instmap[inst].pid_to_inform != nil do
2024-01-02 21:39:00 +00:00
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
end
%{
state
| decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst)
}
else
state
end
2023-11-28 21:42:11 +00:00
end
2024-01-07 15:02:11 +00:00
def propose(pid, inst, value, t, action \\ nil) do
send(pid, {:propose, inst, value, t, self(), action})
propose_loop(inst)
end
2024-01-02 21:39:00 +00:00
def propose_loop(inInst) do
receive do
{:timeout, inst} ->
if inInst == inst do
{:timeout}
else
propose_loop(inInst)
end
{:abort, inst} ->
if inInst == inst do
{:abort}
else
propose_loop(inInst)
end
{:decision, inst, d} ->
if inInst == inst do
{:decision, d}
else
propose_loop(inInst)
end
x ->
Process.send_after(self(), x, 500)
2024-01-02 21:39:00 +00:00
propose_loop(inInst)
end
end
def get_decision(pid, inst, t) do
send(pid, {:get_value, inst, self(), t})
get_decision_loop(inst)
end
def get_decision_loop(inInst) do
receive do
{:get_value_res, inst} ->
if inst == inInst do
nil
else
get_decision_loop(inInst)
end
{:get_value_res_actual, inst, v} ->
if inst == inInst do
v
else
get_decision_loop(inInst)
end
2023-11-28 21:42:11 +00:00
x ->
Process.send_after(self(), x, 500)
2024-01-02 21:39:00 +00:00
get_decision_loop(inInst)
end
2023-11-28 21:42:11 +00:00
end
end