merge files and made set_instmap to func
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Andre Henriques 2024-01-16 12:34:34 +00:00
parent 0e1e13950d
commit 6bdb56755d
6 changed files with 299 additions and 245 deletions

View File

@ -1,63 +0,0 @@
defmodule EagerReliableBroadcast do
def get_rb_name(name) do
String.to_atom(Atom.to_string(name) <> "_rb")
end
def get_non_rb_name(name) do
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
end
def start(name, processes) do
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
Utils.register_name(get_rb_name(name), pid)
end
# Init event must be the first
# one after the component is created
def init(parent, processes) do
state = %{
name: get_rb_name(parent),
parent: parent,
processes: Enum.map(processes, fn name -> get_rb_name(name) end),
# Use this data structure to remember IDs of the delivered messages
delivered: %{},
# Use this variable to remember the last sequence number used to identify a message
seq_no: 0
}
run(state)
end
def run(state) do
run(
receive do
# Handle the broadcast request event
{:broadcast, m} ->
data_msg = {:data, state.name, state.seq_no, m}
Utils.beb_broadcast(data_msg, state.processes)
%{state | seq_no: state.seq_no + 1}
{:data, proc, seq_no, m} ->
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
data_msg = {:data, proc, seq_no, m}
Utils.beb_broadcast(data_msg, state.processes)
Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent)
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
else
val = Map.get(state.delivered, {proc, seq_no, m})
if val < Enum.count(state.processes) do
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
else
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
end
end
end
)
end
def broadcast(name, m) do
Utils.unicast({:broadcast, m}, get_rb_name(name))
end
end

View File

@ -1,76 +0,0 @@
#
# Emits {:ele_trust, proc }
#
defmodule EventualLeaderElector do
def getEleName(name) do
String.to_atom(Atom.to_string(name) <> "_ele")
end
def start(name, processes) do
new_name = getEleName(name)
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
Utils.register_name(new_name, pid)
end
# Init event must be the first
# one after the component is created
def init(name, parent, processes) do
processes = Enum.map(processes, fn name -> getEleName(name) end)
state = %{
name: name,
parent: parent,
processes: processes,
timeout: 1000,
heard_back: MapSet.new(),
seq: 0,
last_trust: nil
}
run(request_heartbeats(state))
end
def request_heartbeats(state) do
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
Process.send_after(self(), {:timeout}, state.timeout)
state
end
def run(state) do
run(
receive do
{:heartbeat_request, name, seq} ->
Utils.unicast({:heartbeat, state.parent, seq}, name)
state
{:heartbeat, name, seq} ->
if seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)}
else
state
end
{:timeout} ->
state =
if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do
state
else
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
if state.last_trust != to_trust do
Utils.unicast({:ele_trust, to_trust}, state.parent)
%{state | last_trust: to_trust}
else
state
end
end
request_heartbeats(state)
end
)
end
end

View File

@ -1,3 +1,56 @@
defmodule Utils do
def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil')
def safecast(p, m) when is_pid(p), do: send(p, m)
def safecast(p, m) do
case :global.whereis_name(p) do
pid when is_pid(pid) -> send(pid, m)
:undefined -> :ok
end
end
@deprecated
def unicast(m, p) do
case :global.whereis_name(p) do
pid when is_pid(pid) -> send(pid, m)
:undefined -> :ok
end
end
def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m))
def register_name(name, pid, link \\ true) do
case :global.re_register_name(name, pid) do
:yes ->
# Note this is running on the parent so we are linking the parent to the rb
# so that when we close the parent the rb also dies
if link do
Process.link(pid)
end
pid
:no ->
Process.exit(pid, :kill)
:error
end
end
defmacro checkinst(val, do: expr) do
quote do
case var!(state).instmap[var!(inst)] != nil do
unquote(val) -> unquote(expr)
_ -> var!(state)
end
end
end
defmacro checkinst(do: expr) do
quote do
checkinst(true, expr)
end
end
end
#
#
# Possible actions
@ -5,15 +58,26 @@
# :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot
# this is usefull when forcing a nack
#
defmodule Paxos do
require Utils
import Utils
defmacro set_instmap(do: expr) do
quote do
var!(map) = var!(state).instmap[var!(inst)]
new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr))
var!(state) = %{var!(state) | instmap: new_instmap }
end
end
# Starts the Paxos replica with a specific name and some processes
def start(name, processes) do
IO.puts("Starting paxos for #{name}")
pid = spawn(Paxos, :init, [name, name, processes])
Utils.register_name(name, pid, false)
register_name(name, pid, false)
end
# Init event must be the first
@ -34,10 +98,9 @@ defmodule Paxos do
run(state)
end
# Guarantees that a specific state exists
def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
if Map.has_key?(state.instmap, inst) do
state
else
checkinst false do
instmap =
Map.put(state.instmap, inst, %{
value: value,
@ -96,7 +159,9 @@ defmodule Paxos do
# Inform the pid with timeout right way
send(pid_to_inform, {:timeout, inst});
set_instmap(state, inst, fn map -> %{map| ballot: Ballot.inc(map.ballot)} end)
set_instmap do
%{map| ballot: Ballot.inc(map.ballot)}
end
not Map.has_key?(state.instmap, inst) ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
@ -108,15 +173,15 @@ defmodule Paxos do
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
Process.send_after(self(), {:timeout, inst}, t)
prepare(
set_instmap(state, inst, fn map -> %{
map
| value: value,
set_instmap do
%{ map |
value: value,
pid_to_inform: pid_to_inform,
action: action,
} end),
inst
)
}
end
prepare(state, inst)
true ->
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
@ -134,7 +199,9 @@ defmodule Paxos do
true ->
state = has_or_create(state, inst)
state = set_instmap(state, inst, fn map -> %{map | other_value: value} end)
set_instmap do
%{map | other_value: value}
end
prepare(state, inst)
end
@ -148,31 +215,28 @@ defmodule Paxos do
not Map.has_key?(state.instmap, inst) ->
state = has_or_create(state, inst)
Utils.unicast(
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value},
proc
)
safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value});
set_instmap(state, inst, fn map -> %{
map
set_instmap do
%{ map
| ballot: ballot
} end)
}
end
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
Utils.unicast(
safecast(proc,
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
state.instmap[inst].accepted_value},
proc
state.instmap[inst].accepted_value}
)
set_instmap(state, inst, fn map -> %{
map
set_instmap do
%{ map
| ballot: ballot
} end)
}
end
true ->
Utils.unicast({:nack, inst, ballot}, proc)
safecast(proc, {:nack, inst, ballot})
state
end
@ -197,12 +261,13 @@ defmodule Paxos do
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot})
set_instmap(state, inst, fn map -> %{
map | has_sent_accept: false,
set_instmap do
%{ map | has_sent_accept: false,
has_sent_prepare: false,
ballot: Ballot.inc(map.ballot),
aborted: true,
} end)
}
end
true ->
state
@ -234,11 +299,12 @@ defmodule Paxos do
state
Ballot.compare(ballot, &==/2, state.instmap[inst].ballot) ->
state =
set_instmap(state, inst, fn map -> %{
set_instmap do
%{
map
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
} end)
}
end
prepared(state, inst)
@ -262,17 +328,18 @@ defmodule Paxos do
if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do
IO.puts("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
Utils.unicast({:accepted, inst, ballot}, proc)
safecast(proc, {:accepted, inst, ballot})
set_instmap(state, inst, fn map -> %{
map
set_instmap do
%{ map
| ballot: ballot,
accepted_value: value,
accepted_ballot: ballot
} end)
}
end
else
IO.puts("#{state.name} -> #{proc} nack")
Utils.unicast({:nack, inst, ballot}, proc)
safecast(proc, {:nack, inst, ballot})
state
end
end
@ -284,14 +351,14 @@ defmodule Paxos do
has_finished(state, inst) ->
state
state.leader == state.name and Ballot.compare(state.instmap[inst].ballot, &==/2, ballot) ->
accepted(
set_instmap(state, inst, fn map -> %{
map
| accepted: map.accepted + 1
} end),
inst
)
state.leader == state.name and state.instmap[inst].ballot == ballot ->
set_instmap do
%{ map |
accepted: map.accepted + 1
}
end
accepted( state, inst)
true ->
state
@ -334,11 +401,6 @@ defmodule Paxos do
)
end
def set_instmap(state, inst, set_instmap) do
new_instmap = Map.put(state.instmap, inst, set_instmap.(state.instmap[inst]))
%{state | instmap: new_instmap}
end
#
# Puts process in the preapre state
#
@ -364,7 +426,8 @@ defmodule Paxos do
IO.puts("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}")
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
set_instmap(state, inst, fn map -> %{
set_instmap do
%{
map
| prepared_values: [],
accepted: 0,
@ -372,7 +435,8 @@ defmodule Paxos do
ballot_value: nil,
has_sent_prepare: true,
has_sent_accept: false
} end)
}
end
end
end
@ -413,11 +477,12 @@ defmodule Paxos do
{:accept, inst, state.instmap[inst].ballot, a_val}
)
set_instmap(state, inst, fn map -> %{
map
| ballot_value: a_val,
set_instmap do
%{ map |
ballot_value: a_val,
has_sent_accept: true
} end)
}
end
else
state
end
@ -551,3 +616,144 @@ defmodule Ballot do
end
end
defmodule EagerReliableBroadcast do
def get_rb_name(name) do
String.to_atom(Atom.to_string(name) <> "_rb")
end
def get_non_rb_name(name) do
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
end
def start(name, processes) do
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
Utils.register_name(get_rb_name(name), pid)
end
# Init event must be the first
# one after the component is created
def init(parent, processes) do
state = %{
name: get_rb_name(parent),
parent: parent,
processes: Enum.map(processes, fn name -> get_rb_name(name) end),
# Use this data structure to remember IDs of the delivered messages
delivered: %{},
# Use this variable to remember the last sequence number used to identify a message
seq_no: 0
}
run(state)
end
def run(state) do
run(
receive do
# Handle the broadcast request event
{:broadcast, m} ->
data_msg = {:data, state.name, state.seq_no, m}
Utils.beb_broadcast(data_msg, state.processes)
%{state | seq_no: state.seq_no + 1}
{:data, proc, seq_no, m} ->
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
data_msg = {:data, proc, seq_no, m}
Utils.beb_broadcast(data_msg, state.processes)
Utils.unicast({:rb_deliver, get_non_rb_name(proc), m}, state.parent)
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
else
val = Map.get(state.delivered, {proc, seq_no, m})
if val < Enum.count(state.processes) do
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
else
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
end
end
end
)
end
def broadcast(name, m) do
Utils.unicast({:broadcast, m}, get_rb_name(name))
end
end
#
# Emits {:ele_trust, proc }
#
defmodule EventualLeaderElector do
def getEleName(name) do
String.to_atom(Atom.to_string(name) <> "_ele")
end
def start(name, processes) do
new_name = getEleName(name)
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
Utils.register_name(new_name, pid)
end
# Init event must be the first
# one after the component is created
def init(name, parent, processes) do
processes = Enum.map(processes, fn name -> getEleName(name) end)
state = %{
name: name,
parent: parent,
processes: processes,
timeout: 1000,
heard_back: MapSet.new(),
seq: 0,
last_trust: nil
}
run(request_heartbeats(state))
end
def request_heartbeats(state) do
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
Utils.beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
Process.send_after(self(), {:timeout}, state.timeout)
state
end
def run(state) do
run(
receive do
{:heartbeat_request, name, seq} ->
Utils.unicast({:heartbeat, state.parent, seq}, name)
state
{:heartbeat, name, seq} ->
if seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)}
else
state
end
{:timeout} ->
state =
if MapSet.size(state.heard_back) < floor(length(state.processes)/2) + 1 do
state
else
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
if state.last_trust != to_trust do
Utils.unicast({:ele_trust, to_trust}, state.parent)
%{state | last_trust: to_trust}
else
state
end
end
request_heartbeats(state)
end
)
end
end

17
lib/test.ex Normal file
View File

@ -0,0 +1,17 @@
defmodule Test do
defmacro createfuncBase(name) do
quote do
def unquote(name)(true), do: true
end
end
end
defmodule Test2 do
require Test
def test(), do: false
Test.createfuncBase(:test)
end

View File

@ -1,7 +1,4 @@
# Replace with your own implementation source files
IEx.Helpers.c("utils.ex", ".")
IEx.Helpers.c("eager_reliable_broadcast.ex", ".")
IEx.Helpers.c("leader_elector.ex", ".")
IEx.Helpers.c("paxos.ex", ".")
# Do not modify the following ##########

View File

@ -1,27 +0,0 @@
defmodule Utils do
def unicast(m, p) do
case :global.whereis_name(p) do
pid when is_pid(pid) -> send(pid, m)
:undefined -> :ok
end
end
def beb_broadcast(m, dest), do: for(p <- dest, do: unicast(m, p))
def register_name(name, pid, link \\ true) do
case :global.re_register_name(name, pid) do
:yes ->
# Note this is running on the parent so we are linking the parent to the rb
# so that when we close the parent the rb also dies
if link do
Process.link(pid)
end
pid
:no ->
Process.exit(pid, :kill)
:error
end
end
end