Paxos version 1
This commit is contained in:
parent
790ecf5446
commit
c97f2e9a22
@ -1,95 +0,0 @@
|
||||
defmodule EagerReliableBroadcast do
|
||||
def start(name, processes) do
|
||||
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
|
||||
# :global.unregister_name(name)
|
||||
case :global.re_register_name(name, pid) do
|
||||
:yes -> pid
|
||||
:no -> :error
|
||||
end
|
||||
IO.puts "registered #{name}"
|
||||
pid
|
||||
end
|
||||
|
||||
# Init event must be the first
|
||||
# one after the component is created
|
||||
def init(name, processes) do
|
||||
state = %{
|
||||
name: name,
|
||||
processes: processes,
|
||||
delivered: %{}, # Use this data structure to remember IDs of the delivered messages
|
||||
seq_no: 0 # Use this variable to remember the last sequence number used to identify a message
|
||||
}
|
||||
run(state)
|
||||
end
|
||||
|
||||
def run(state) do
|
||||
state = receive do
|
||||
# Handle the broadcast request event
|
||||
{:broadcast, m} ->
|
||||
IO.puts("#{inspect state.name}: RB-broadcast: #{inspect m}")
|
||||
# Create a unique message identifier from state.name and state.seqno.
|
||||
# Create a new data message data_msg from the given payload m
|
||||
# the message identifier.
|
||||
# Update the state as necessary
|
||||
data_msg = {:data, state.name, state.seq_no, m}
|
||||
|
||||
# Use the provided beb_broadcast function to propagate data_msg to
|
||||
|
||||
# all process
|
||||
beb_broadcast(data_msg, state.processes)
|
||||
%{state | seq_no: state.seq_no + 1 }
|
||||
|
||||
{:data, proc, seq_no, m} ->
|
||||
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
|
||||
data_msg = {:data, proc, seq_no, m}
|
||||
beb_broadcast(data_msg, state.processes)
|
||||
unicast({:deliver, proc, m}, state.name)
|
||||
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
|
||||
else
|
||||
val = Map.get(state.delivered, {proc, seq_no, m})
|
||||
if val < Enum.count(state.processes) do
|
||||
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
|
||||
else
|
||||
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
|
||||
end
|
||||
end
|
||||
|
||||
# If <proc, seqno> was already delivered, do nothing.
|
||||
# Otherwise, update delivered, generate a deliver event for the
|
||||
# upper layer, and re-broadcast (echo) the received message.
|
||||
# In both cases, do not forget to return the state.
|
||||
|
||||
{:deliver, proc, m} ->
|
||||
# Simulate the deliver indication event
|
||||
IO.puts("#{inspect state.name}: RB-deliver: #{inspect m} from #{inspect proc}")
|
||||
state
|
||||
end
|
||||
run(state)
|
||||
end
|
||||
|
||||
|
||||
defp unicast(m, p) do
|
||||
case :global.whereis_name(p) do
|
||||
pid when is_pid(pid) -> send(pid, m)
|
||||
:undefined -> :ok
|
||||
end
|
||||
end
|
||||
|
||||
defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p)
|
||||
|
||||
# You can use this function to simulate a process failure.
|
||||
# name: the name of this process
|
||||
# proc_to_fail: the name of the failed process
|
||||
# fail_send_to: list of processes proc_to_fail will not be broadcasting messages to
|
||||
# Note that this list must include proc_to_fail.
|
||||
# m and dest are the same as the respective arguments of the normal
|
||||
# beb_broadcast.
|
||||
defp beb_broadcast_with_failures(name, proc_to_fail, fail_send_to, m, dest) do
|
||||
if name == proc_to_fail do
|
||||
for p <- dest, p not in fail_send_to, do: unicast(m, p)
|
||||
else
|
||||
for p <- dest, p != proc_to_fail, do: unicast(m, p)
|
||||
end
|
||||
end
|
||||
|
||||
end
|
12
lib/distributed_system_coursework.ex
Normal file
12
lib/distributed_system_coursework.ex
Normal file
@ -0,0 +1,12 @@
|
||||
defmodule Runner do
|
||||
use Application
|
||||
|
||||
def runner_leader_elector_test() do
|
||||
IO.puts("Run leader elector test")
|
||||
end
|
||||
|
||||
def start(_type, _args) do
|
||||
IO.puts("Starting Runner")
|
||||
Mix.Task.run(Runner.runner_leader_elector_test())
|
||||
end
|
||||
end
|
63
lib/eager_reliable_broadcast.ex
Normal file
63
lib/eager_reliable_broadcast.ex
Normal file
@ -0,0 +1,63 @@
|
||||
defmodule EagerReliableBroadcast do
|
||||
def get_rb_name(name) do
|
||||
String.to_atom(Atom.to_string(name) <> "_rb")
|
||||
end
|
||||
|
||||
def get_non_rb_name(name) do
|
||||
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
|
||||
end
|
||||
|
||||
def start(name, processes) do
|
||||
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
|
||||
Utils.register_name(get_rb_name(name), pid)
|
||||
end
|
||||
|
||||
# Init event must be the first
|
||||
# one after the component is created
|
||||
def init(parent, processes) do
|
||||
state = %{
|
||||
name: get_rb_name(parent),
|
||||
parent: parent,
|
||||
processes: Enum.map(processes, fn name -> get_rb_name(name) end),
|
||||
# Use this data structure to remember IDs of the delivered messages
|
||||
delivered: %{},
|
||||
# Use this variable to remember the last sequence number used to identify a message
|
||||
seq_no: 0
|
||||
}
|
||||
|
||||
run(state)
|
||||
end
|
||||
|
||||
def run(state) do
|
||||
run(
|
||||
receive do
|
||||
# Handle the broadcast request event
|
||||
{:broadcast, m} ->
|
||||
data_msg = {:data, state.name, state.seq_no, m}
|
||||
Utils.beb_broadcast(data_msg, state.processes)
|
||||
%{state | seq_no: state.seq_no + 1}
|
||||
|
||||
{:data, proc, seq_no, m} ->
|
||||
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
|
||||
data_msg = {:data, proc, seq_no, m}
|
||||
Utils.beb_broadcast(data_msg, state.processes)
|
||||
|
||||
Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent)
|
||||
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
|
||||
else
|
||||
val = Map.get(state.delivered, {proc, seq_no, m})
|
||||
|
||||
if val < Enum.count(state.processes) do
|
||||
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
|
||||
else
|
||||
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
|
||||
end
|
||||
end
|
||||
end
|
||||
)
|
||||
end
|
||||
|
||||
def broadcast(name, m) do
|
||||
Utils.unicast({:broadcast, m}, get_rb_name(name))
|
||||
end
|
||||
end
|
86
lib/leader_elector.ex
Normal file
86
lib/leader_elector.ex
Normal file
@ -0,0 +1,86 @@
|
||||
#
|
||||
# Emits {:ele_trust, proc }
|
||||
#
|
||||
|
||||
defmodule EventualLeaderElector do
|
||||
def getEleName(name) do
|
||||
String.to_atom(Atom.to_string(name) <> "_ele")
|
||||
end
|
||||
|
||||
def getOriginalName(name) do
|
||||
String.to_atom(String.replace(Atom.to_string(name), "_ele", ""))
|
||||
end
|
||||
|
||||
def start(name, processes) do
|
||||
new_name = getEleName(name)
|
||||
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
|
||||
|
||||
Utils.register_name(new_name, pid)
|
||||
end
|
||||
|
||||
# Init event must be the first
|
||||
# one after the component is created
|
||||
def init(name, parent, processes) do
|
||||
processes = Enum.map(processes, fn name -> getEleName(name) end)
|
||||
|
||||
state = %{
|
||||
name: name,
|
||||
parent: parent,
|
||||
processes: processes,
|
||||
mp_processes: MapSet.new(processes),
|
||||
timeout: 1000,
|
||||
heard_back: MapSet.new(),
|
||||
seq: 0,
|
||||
last_trust: nil
|
||||
}
|
||||
|
||||
run(request_heartbeats(state))
|
||||
end
|
||||
|
||||
def request_heartbeats(state) do
|
||||
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
|
||||
Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
|
||||
|
||||
Process.send_after(self(), {:timeout}, state.timeout)
|
||||
state
|
||||
end
|
||||
|
||||
def run(state) do
|
||||
run(
|
||||
receive do
|
||||
{:heartbeat_request, name, seq} ->
|
||||
Utils.unicast({:heartbeat, state.name, seq}, name)
|
||||
state
|
||||
|
||||
{:heartbeat, name, seq} ->
|
||||
if seq == state.seq do
|
||||
%{state | heard_back: MapSet.put(state.heard_back, name)}
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
{:timeout} ->
|
||||
active = MapSet.intersection(state.mp_processes, state.heard_back)
|
||||
|
||||
state = %{state | heard_back: MapSet.new()}
|
||||
|
||||
state =
|
||||
if MapSet.size(active) == 0 do
|
||||
state
|
||||
else
|
||||
to_trust = Enum.at(MapSet.to_list(active), 0)
|
||||
to_trust = getOriginalName(to_trust)
|
||||
|
||||
if state.last_trust != to_trust do
|
||||
Utils.unicast({:ele_trust, to_trust}, state.parent)
|
||||
%{state | last_trust: to_trust}
|
||||
else
|
||||
state
|
||||
end
|
||||
end
|
||||
|
||||
request_heartbeats(state)
|
||||
end
|
||||
)
|
||||
end
|
||||
end
|
54
lib/mix/tasks/test_leader_elector.ex
Normal file
54
lib/mix/tasks/test_leader_elector.ex
Normal file
@ -0,0 +1,54 @@
|
||||
defmodule Mix.Tasks.TestLeaderElector do
|
||||
use Mix.Task
|
||||
|
||||
def run(_) do
|
||||
IO.puts("Testing Leader Elector")
|
||||
|
||||
procs = Enum.map(1..20, fn i -> String.to_atom("p#{i}") end)
|
||||
pids = Enum.map(procs, fn p -> start(p, procs) end)
|
||||
|
||||
IO.puts("spawned")
|
||||
|
||||
Process.send_after(self(), {:end}, 50000)
|
||||
|
||||
receive do
|
||||
{:end} ->
|
||||
IO.puts("test ended")
|
||||
end
|
||||
end
|
||||
|
||||
def start(name, procs) do
|
||||
pid = spawn(Mix.Tasks.TestLeaderElector, :init, [name, procs])
|
||||
|
||||
case :global.re_register_name(name, pid) do
|
||||
:yes ->
|
||||
# Note this is running on the parent so we are linking the parent to the rb
|
||||
# so that when we close the parent the rb also dies
|
||||
pid
|
||||
|
||||
:no ->
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
def init(name, procs) do
|
||||
IO.puts("#{name}: init")
|
||||
EventualLeaderElector.start(name, procs)
|
||||
|
||||
run_test(name)
|
||||
end
|
||||
|
||||
def run_test(name) do
|
||||
receive do
|
||||
{:ele_trust, proc} ->
|
||||
IO.puts("#{name}: trust process #{Atom.to_string(proc)}")
|
||||
|
||||
if proc == name do
|
||||
IO.puts("#{name}: Trust myself will stop\n\n\n")
|
||||
Process.exit(self(), :exit)
|
||||
end
|
||||
end
|
||||
|
||||
run_test(name)
|
||||
end
|
||||
end
|
47
lib/mix/tasks/test_paxos.ex
Normal file
47
lib/mix/tasks/test_paxos.ex
Normal file
@ -0,0 +1,47 @@
|
||||
defmodule Mix.Tasks.TestPaxos do
|
||||
use Mix.Task
|
||||
|
||||
def run(_) do
|
||||
IO.puts("Testing Paxos")
|
||||
|
||||
procs = Enum.map(0..3, fn i -> String.to_atom("p#{i}") end)
|
||||
_pids = Enum.map(procs, fn p -> start(p, procs) end)
|
||||
|
||||
IO.puts("spawned")
|
||||
|
||||
Process.send_after(self(), {:end}, 50000)
|
||||
|
||||
receive do
|
||||
{:end} ->
|
||||
IO.puts("test ended")
|
||||
end
|
||||
end
|
||||
|
||||
def start(name, procs) do
|
||||
pid = spawn(Mix.Tasks.TestPaxos, :init, [name, procs])
|
||||
|
||||
Utils.register_name(name, pid)
|
||||
end
|
||||
|
||||
def init(name, procs) do
|
||||
IO.puts("#{name}: init")
|
||||
Paxos.start(name, procs)
|
||||
|
||||
Process.sleep(200)
|
||||
|
||||
if name == :p0 do
|
||||
Paxos.propose(name, "Hi I was decided")
|
||||
end
|
||||
|
||||
run_test(name)
|
||||
end
|
||||
|
||||
def run_test(name) do
|
||||
receive do
|
||||
{:decide, value} ->
|
||||
IO.puts("#{name}: decide #{value}")
|
||||
end
|
||||
|
||||
run_test(name)
|
||||
end
|
||||
end
|
178
lib/paxos.ex
Normal file
178
lib/paxos.ex
Normal file
@ -0,0 +1,178 @@
|
||||
defmodule Paxos do
|
||||
def getPaxosName(name) do
|
||||
String.to_atom(Atom.to_string(name) <> "_paxos")
|
||||
end
|
||||
|
||||
def getOriginalName(name) do
|
||||
String.to_atom(String.replace(Atom.to_string(name), "_paxos", ""))
|
||||
end
|
||||
|
||||
def start(name, processes) do
|
||||
new_name = getPaxosName(name)
|
||||
pid = spawn(Paxos, :init, [new_name, name, processes])
|
||||
|
||||
Utils.register_name(new_name, pid)
|
||||
end
|
||||
|
||||
# Init event must be the first
|
||||
# one after the component is created
|
||||
def init(name, parent, processes) do
|
||||
processes = Enum.map(processes, fn name -> getPaxosName(name) end)
|
||||
EventualLeaderElector.start(name, processes)
|
||||
EagerReliableBroadcast.start(name, processes)
|
||||
|
||||
state = %{
|
||||
name: name,
|
||||
parent: parent,
|
||||
processes: processes,
|
||||
leader: nil,
|
||||
value: nil,
|
||||
other_value: nil,
|
||||
ballot: 0,
|
||||
ballot_value: nil,
|
||||
prepared_values: [],
|
||||
accepted: 0,
|
||||
running_ballot: 0,
|
||||
accepted_ballot: nil,
|
||||
accepted_value: nil,
|
||||
decided: false,
|
||||
pid_to_inform: nil
|
||||
}
|
||||
|
||||
run(state)
|
||||
end
|
||||
|
||||
def run(state) do
|
||||
run(
|
||||
receive do
|
||||
{:ele_trust, proc} ->
|
||||
prepare(%{state | leader: proc})
|
||||
|
||||
{:propose, value, pid_to_inform} ->
|
||||
if state.value == nil do
|
||||
EagerReliableBroadcast.broadcast(state.name, {:other_propose, value})
|
||||
prepare(%{state | value: value, pid_to_inform: pid_to_inform})
|
||||
else
|
||||
%{state | pid_to_inform: pid_to_inform}
|
||||
end
|
||||
|
||||
{:rb_deliver, _proc, {:other_propose, value}} ->
|
||||
%{state | other_value: value}
|
||||
|
||||
{:rb_deliver, _proc, {:prepare, proc, ballot}} ->
|
||||
if ballot > state.running_ballot do
|
||||
Utils.unicast({:prepared, ballot, state.accepted_ballot, state.accepted_value}, proc)
|
||||
%{state | running_ballot: ballot}
|
||||
else
|
||||
Utils.unicast({:nack, ballot}, proc)
|
||||
state
|
||||
end
|
||||
|
||||
{:nack, ballot} ->
|
||||
if state.leader == state.name and state.ballot == ballot do
|
||||
prepare(state)
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
{:prepared, ballot, accepted_ballot, accepted_value} ->
|
||||
if ballot == state.ballot do
|
||||
prepared(%{
|
||||
state
|
||||
| prepared_values: state.prepared_values ++ [{accepted_ballot, accepted_value}]
|
||||
})
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
{:rb_deliver, proc, {:accept, ballot, value}} ->
|
||||
if ballot >= state.running_ballot do
|
||||
Utils.unicast({:accepted, ballot}, proc)
|
||||
%{state | running_ballot: ballot, accepted_value: value, accepted_ballot: ballot}
|
||||
else
|
||||
Utils.unicast({:nack, ballot}, proc)
|
||||
state
|
||||
end
|
||||
|
||||
{:accepted, ballot} ->
|
||||
if state.leader == state.name and state.ballot == ballot do
|
||||
accepted(%{state | accepted: state.accepted + 1})
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
{:rb_deliver, _, {:decide, value}} ->
|
||||
if state.decided == true do
|
||||
state
|
||||
else
|
||||
if state.pid_to_inform != nil do
|
||||
send(state.pid_to_inform, {:decide, value})
|
||||
end
|
||||
|
||||
%{state | decided: true}
|
||||
end
|
||||
end
|
||||
)
|
||||
end
|
||||
|
||||
#
|
||||
# Puts process in the preapre state
|
||||
#
|
||||
|
||||
def prepare(state) when state.leader != state.name, do: state
|
||||
def prepare(state) when state.value == nil and state.other_value == nil, do: state
|
||||
|
||||
def prepare(state) do
|
||||
ballot = state.ballot + 1
|
||||
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, ballot})
|
||||
%{state | ballot: ballot, prepared_values: [], accepted: 0, ballot_value: nil}
|
||||
end
|
||||
|
||||
#
|
||||
# Process the prepared responses
|
||||
#
|
||||
def prepared(state) when state.leader != state.name, do: state
|
||||
|
||||
def prepared(state) when length(state.prepared_values) > length(state.processes) / 2 + 1 do
|
||||
{_, a_val} =
|
||||
Enum.reduce(state.prepared_values, {0, nil}, fn {bal, val}, {a_bal, a_val} ->
|
||||
if a_bal > bal do
|
||||
{a_bal, a_val}
|
||||
else
|
||||
{bal, val}
|
||||
end
|
||||
end)
|
||||
|
||||
a_val =
|
||||
if a_val == nil do
|
||||
if state.value == nil do
|
||||
state.other_value
|
||||
else
|
||||
state.value
|
||||
end
|
||||
else
|
||||
a_val
|
||||
end
|
||||
|
||||
EagerReliableBroadcast.broadcast(state.name, {:accept, state.ballot, a_val})
|
||||
%{state | ballot_value: a_val}
|
||||
end
|
||||
|
||||
def prepared(state), do: state
|
||||
|
||||
#
|
||||
# Process the accepted responses
|
||||
#
|
||||
def accepted(state) when state.leader != state.name, do: state
|
||||
|
||||
def accepted(state) when state.accepted > length(state.processes) / 2 + 1 do
|
||||
EagerReliableBroadcast.broadcast(state.name, {:decide, state.ballot_value})
|
||||
state
|
||||
end
|
||||
|
||||
def accepted(state), do: state
|
||||
|
||||
def propose(name, value) do
|
||||
Utils.unicast({:propose, value}, getPaxosName(name))
|
||||
end
|
||||
end
|
24
lib/utils.ex
Normal file
24
lib/utils.ex
Normal file
@ -0,0 +1,24 @@
|
||||
defmodule Utils do
|
||||
def unicast(m, p) do
|
||||
case :global.whereis_name(p) do
|
||||
pid when is_pid(pid) -> send(pid, m)
|
||||
:undefined -> :ok
|
||||
end
|
||||
end
|
||||
|
||||
def beb_broadcast(m, dest), do: for(p <- dest, do: unicast(m, p))
|
||||
|
||||
def register_name(name, pid) do
|
||||
case :global.re_register_name(name, pid) do
|
||||
:yes ->
|
||||
# Note this is running on the parent so we are linking the parent to the rb
|
||||
# so that when we close the parent the rb also dies
|
||||
Process.link(pid)
|
||||
pid
|
||||
|
||||
:no ->
|
||||
Process.exit(pid, :kill)
|
||||
:error
|
||||
end
|
||||
end
|
||||
end
|
Reference in New Issue
Block a user