Andre Henriques
8c674626f8
All checks were successful
continuous-integration/drone/push Build is passing
845 lines
23 KiB
Elixir
845 lines
23 KiB
Elixir
#
|
|
# This files contains some modules nessesary for paxos to work
|
|
#
|
|
# Utils: General Utilities
|
|
# Paxos: Paxos logic
|
|
# Ballot: Contain the logic to handle the ballot numbers
|
|
# EventualLeaderElector: Leader elector code
|
|
# EagerReliableBroadcast: Handles eager reliable broadcast
|
|
#
|
|
|
|
defmodule Utils do
|
|
@moduledoc """
|
|
This module contains some helpful functions are used throughout all modules
|
|
"""
|
|
|
|
@doc """
|
|
Sets the min log level
|
|
0 - show all
|
|
1 - ignore paxos
|
|
2 - ignore server
|
|
"""
|
|
@min_print_level 3
|
|
|
|
@doc """
|
|
This function works similiary with unicast but it allows both for a pid or an atom
|
|
to be given as the first parameter
|
|
"""
|
|
def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil')
|
|
def safecast(p, m) when is_pid(p), do: send(p, m)
|
|
def safecast(p, m) do
|
|
case :global.whereis_name(p) do
|
|
pid when is_pid(pid) -> send(pid, m)
|
|
:undefined -> :ok
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This function creates a new name for processes
|
|
"""
|
|
def alter_name(name, part), do: :"#{name}#{part}"
|
|
|
|
@doc """
|
|
This function sends a message to a list of porcesses
|
|
"""
|
|
def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m))
|
|
|
|
@doc """
|
|
This function register a pid in the global name space
|
|
"""
|
|
def register_name(name, pid, link \\ true) do
|
|
case :global.re_register_name(name, pid) do
|
|
:yes ->
|
|
# Note this is running on the parent so we are linking the parent to the rb
|
|
# so that when we close the parent the rb also dies
|
|
if link do
|
|
Process.link(pid)
|
|
end
|
|
pid
|
|
|
|
:no ->
|
|
Process.exit(pid, :kill)
|
|
:error
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This macro defines a new log function for a specific log level
|
|
"""
|
|
defmacro create_log(level) do
|
|
quote do
|
|
def log(msg) do
|
|
Utils._log(msg, unquote(level))
|
|
end
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This function is used bu the create_log macro.
|
|
|
|
This function only prints logs if the level definied in the create_log macro
|
|
"""
|
|
def _log(msg, level) do
|
|
if (@min_print_level <= level) do
|
|
IO.puts(msg)
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This macro defines a an or_state function that acts like this:
|
|
if val do
|
|
expr
|
|
else
|
|
state
|
|
end
|
|
"""
|
|
defmacro or_state(val, do: expr) do
|
|
quote do
|
|
case unquote(val) do
|
|
true -> unquote(expr)
|
|
_ -> var!(state)
|
|
end
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This macro defines a macro to simplify the process of creating run functions
|
|
"""
|
|
defmacro runfn(do: expr) do
|
|
quote do
|
|
def run(s) do
|
|
var!(state) = s
|
|
run(receive do
|
|
unquote(expr)
|
|
end)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
defmodule Paxos do
|
|
@moduledoc """
|
|
This module contains all the logic for Paxos
|
|
|
|
To start a paxos instance run Paxos.start
|
|
|
|
To propose a a value to paxos run Paxos.propose
|
|
|
|
To get a previous decision please run Paxos.get_decision
|
|
"""
|
|
require Utils
|
|
import Utils
|
|
|
|
create_log 0
|
|
|
|
@doc """
|
|
This macro allows the state.instmap to be updated very easily
|
|
"""
|
|
defmacrop set_instmap(do: expr) do
|
|
quote do
|
|
var!(map) = var!(state).instmap[var!(inst)]
|
|
new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr))
|
|
var!(state) = %{var!(state) | instmap: new_instmap }
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Starts the Paxos replica with a specific name and some processes
|
|
"""
|
|
def start(name, processes, link \\ false) do
|
|
log("Starting paxos for #{name}")
|
|
|
|
pid = spawn(Paxos, :init, [name, processes])
|
|
register_name(name, pid, link)
|
|
end
|
|
|
|
@doc """
|
|
Inicializes the state starts the eager reliable broadcast, and starts the eventual leader eletector
|
|
"""
|
|
def init(name, processes) do
|
|
EventualLeaderElector.start(name, processes)
|
|
EagerReliableBroadcast.start(name, processes)
|
|
|
|
state = %{
|
|
name: name,
|
|
processes: processes,
|
|
leader: nil,
|
|
instmap: %{},
|
|
decided: %{}
|
|
}
|
|
|
|
run(state)
|
|
end
|
|
|
|
@doc """
|
|
Guarantees that a specific state exists for a specific instance
|
|
"""
|
|
defp has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
|
|
or_state state.instmap[inst] == nil do
|
|
instmap =
|
|
Map.put(state.instmap, inst, %{
|
|
value: value,
|
|
other_value: nil,
|
|
ballot: Ballot.init(state.name, 0),
|
|
aborted: false,
|
|
ballot_value: nil,
|
|
prepared_values: [],
|
|
accepted: 0,
|
|
accepted_ballot: nil,
|
|
accepted_value: nil,
|
|
pid_to_inform: pid_to_inform,
|
|
has_sent_accept: false,
|
|
action: action,
|
|
has_sent_prepare: false,
|
|
})
|
|
|
|
%{state | instmap: instmap}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Checks if an instance has finished or if it was aborted.
|
|
If the optional parameter ignore_aborted was set to true makes this function only check
|
|
if the the instance has finished
|
|
"""
|
|
defp has_finished(state, inst, ignore_aborted \\ false) do
|
|
cond do
|
|
Map.has_key?(state.decided, inst) -> true
|
|
ignore_aborted -> false
|
|
Map.has_key?(state.instmap, inst) -> state.instmap[inst].aborted
|
|
true -> false
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
This is the run/recieve function
|
|
All the messages that are handled by this function are:
|
|
{:ele_trust, proc} ->
|
|
{:propose, inst, value, pid_to_inform, action} ->
|
|
{:rb_deliver, proc, {:other_propose, inst, value}} ->
|
|
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
|
|
{:nack, inst, ballot} ->
|
|
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
|
|
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
|
|
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
|
|
{:accepted, inst, ballot} ->
|
|
{:get_value, inst, pid_to_inform} ->
|
|
{:rb_deliver, _, {:decide, inst, value}} ->
|
|
"""
|
|
runfn do
|
|
# Handles leader elector
|
|
{:ele_trust, proc} ->
|
|
log("#{state.name} - #{proc} is leader")
|
|
|
|
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
|
|
prepare(st, inst)
|
|
end)
|
|
|
|
# Handles a proposal from the parent process
|
|
{:propose, inst, value, pid_to_inform, action} ->
|
|
log("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
|
|
|
|
cond do
|
|
has_finished(state, inst, true) ->
|
|
log("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}")
|
|
send(pid_to_inform, {:decision, inst, state.decided[inst]})
|
|
state
|
|
|
|
action == :increase_ballot_number ->
|
|
log("#{state.name} - Got request to increase ballot number for inst #{inst}")
|
|
state = has_or_create(state, inst)
|
|
|
|
set_instmap do
|
|
%{map| ballot: Ballot.inc(map.ballot)}
|
|
end
|
|
|
|
not Map.has_key?(state.instmap, inst) ->
|
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
|
state = has_or_create(state, inst, value, pid_to_inform, action)
|
|
prepare(state, inst)
|
|
|
|
state.instmap[inst].value == nil ->
|
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
|
|
|
set_instmap do
|
|
%{ map |
|
|
value: value,
|
|
pid_to_inform: pid_to_inform,
|
|
action: action,
|
|
}
|
|
end
|
|
|
|
prepare(state, inst)
|
|
|
|
true ->
|
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
|
prepare(state, inst)
|
|
end
|
|
|
|
# Handles the sharing of a proposal value to other processes
|
|
{:rb_deliver, proc, {:other_propose, inst, value}} ->
|
|
cond do
|
|
has_finished(state, inst, true) ->
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:decide, inst, state.decided[inst]}
|
|
)
|
|
state
|
|
|
|
true ->
|
|
state = has_or_create(state, inst)
|
|
set_instmap do
|
|
%{map | other_value: value}
|
|
end
|
|
prepare(state, inst)
|
|
end
|
|
|
|
# Handles a prepare request from the leader
|
|
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
|
|
log("#{state.name} - prepare from #{proc}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
not Map.has_key?(state.instmap, inst) ->
|
|
state = has_or_create(state, inst)
|
|
|
|
safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value});
|
|
|
|
set_instmap do
|
|
%{ map | ballot: ballot }
|
|
end
|
|
|
|
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
|
|
safecast(proc,
|
|
{:prepared, inst, ballot, state.instmap[inst].accepted_ballot,
|
|
state.instmap[inst].accepted_value}
|
|
)
|
|
|
|
set_instmap do
|
|
%{ map | ballot: ballot }
|
|
end
|
|
|
|
true ->
|
|
safecast(proc, {:nack, inst, ballot})
|
|
state
|
|
end
|
|
|
|
# Handles a nack
|
|
{:nack, inst, ballot} ->
|
|
log("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
state.leader == state.name and state.instmap[inst].ballot == ballot ->
|
|
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot})
|
|
|
|
set_instmap do
|
|
%{ map | has_sent_accept: false,
|
|
has_sent_prepare: false,
|
|
ballot: Ballot.inc(map.ballot),
|
|
aborted: true,
|
|
}
|
|
end
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
# Handles an abort
|
|
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
true ->
|
|
log("#{state.name} - got information to send abort")
|
|
|
|
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
|
end
|
|
|
|
state
|
|
end
|
|
|
|
# Handles a prepared
|
|
# Sends out accept when a quorum is met
|
|
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
|
|
log(
|
|
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
|
|
)
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
ballot == state.instmap[inst].ballot ->
|
|
set_instmap do
|
|
%{ map
|
|
| prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}]
|
|
}
|
|
end
|
|
prepared(state, inst)
|
|
|
|
Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) ->
|
|
log("#{state.name} - Probably recieved this before preare came to self sending with delay")
|
|
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
|
|
state
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
# Handles a accept
|
|
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
true ->
|
|
state = has_or_create(state, inst)
|
|
|
|
if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do
|
|
log("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}")
|
|
|
|
safecast(proc, {:accepted, inst, ballot})
|
|
|
|
set_instmap do
|
|
%{ map
|
|
| ballot: ballot,
|
|
accepted_value: value,
|
|
accepted_ballot: ballot
|
|
}
|
|
end
|
|
else
|
|
log("#{state.name} -> #{proc} nack")
|
|
safecast(proc, {:nack, inst, ballot})
|
|
state
|
|
end
|
|
end
|
|
|
|
# Handles a accept
|
|
# Sends out accept when a decide when a quoeom is met
|
|
{:accepted, inst, ballot} ->
|
|
log("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}")
|
|
|
|
cond do
|
|
has_finished(state, inst) ->
|
|
state
|
|
|
|
state.leader == state.name and state.instmap[inst].ballot == ballot ->
|
|
set_instmap do
|
|
%{ map | accepted: map.accepted + 1 }
|
|
end
|
|
|
|
accepted( state, inst)
|
|
|
|
true ->
|
|
state
|
|
end
|
|
|
|
# handles a parent request to get a value
|
|
{:get_value, inst, pid_to_inform} ->
|
|
# log("#{state.name} get_value")
|
|
if has_finished(state, inst, true) do
|
|
safecast(pid_to_inform, {:get_value_res, inst, state.decided[inst]})
|
|
end
|
|
state
|
|
|
|
# Adds a decision made by a leader to the list of decisions
|
|
# and informs the parent of the decision
|
|
{:rb_deliver, _, {:decide, inst, value}} ->
|
|
log("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
|
|
|
|
or_state not has_finished(state, inst) do
|
|
if Map.has_key?(state.instmap, inst) != nil and
|
|
state.instmap[inst].pid_to_inform != nil do
|
|
safecast(state.instmap[inst].pid_to_inform, {:decision, inst, value})
|
|
end
|
|
|
|
%{ state |
|
|
decided: Map.put(state.decided, inst, value),
|
|
instmap: Map.delete(state.instmap, inst)
|
|
}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Does the logic to decide when to send the prepare messages
|
|
Also sets the state nessesary to run the proposal
|
|
"""
|
|
defp prepare(state, _) when state.leader != state.name, do: state
|
|
defp prepare(state, inst) do
|
|
cond do
|
|
state.instmap[inst] == nil ->
|
|
state
|
|
|
|
state.instmap[inst].value == nil and state.instmap[inst].other_value == nil ->
|
|
state
|
|
|
|
state.instmap[inst] != nil and state.instmap[inst].has_sent_prepare ->
|
|
state
|
|
|
|
state.instmap[inst] != nil and state.instmap[inst].has_sent_accept ->
|
|
state
|
|
|
|
true ->
|
|
ballot = Ballot.inc(state.instmap[inst].ballot)
|
|
log("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}")
|
|
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
|
|
|
|
set_instmap do
|
|
%{ map |
|
|
prepared_values: [],
|
|
accepted: 0,
|
|
aborted: false,
|
|
ballot_value: nil,
|
|
has_sent_prepare: true,
|
|
has_sent_accept: false
|
|
}
|
|
end
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Processes the prepared messages and when a quorum is met the accept messages are send
|
|
"""
|
|
defp prepared(state, _) when state.leader != state.name, do: state
|
|
defp prepared(state, inst) do
|
|
if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and
|
|
not state.instmap[inst].has_sent_accept do
|
|
{_, a_val} =
|
|
Enum.reduce(state.instmap[inst].prepared_values, {Ballot.init(state.name, 0), nil}, fn {bal, val},
|
|
{acc_bal, acc_val} ->
|
|
cond do
|
|
val == nil ->
|
|
{acc_bal, acc_val}
|
|
Ballot.compare(acc_bal, &>/2, bal) ->
|
|
{acc_bal, acc_val}
|
|
true ->
|
|
{bal, val}
|
|
end
|
|
end)
|
|
|
|
a_val =
|
|
if a_val == nil do
|
|
if state.instmap[inst].value == nil do
|
|
state.instmap[inst].other_value
|
|
else
|
|
state.instmap[inst].value
|
|
end
|
|
else
|
|
a_val
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:accept, inst, state.instmap[inst].ballot, a_val}
|
|
)
|
|
|
|
set_instmap do
|
|
%{ map |
|
|
ballot_value: a_val,
|
|
has_sent_accept: true
|
|
}
|
|
end
|
|
else
|
|
state
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Processes the accepted messages and when a qurum is met decide on the value
|
|
"""
|
|
defp accepted(state, _) when state.leader != state.name, do: state
|
|
defp accepted(state, inst) do
|
|
or_state state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do
|
|
value = state.instmap[inst].ballot_value
|
|
|
|
if state.instmap[inst].action == :kill_before_decision do
|
|
log("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}")
|
|
Process.exit(self(), :kill)
|
|
end
|
|
|
|
EagerReliableBroadcast.broadcast(
|
|
state.name,
|
|
{:decide, inst, value}
|
|
)
|
|
|
|
if state.instmap[inst].pid_to_inform != nil do
|
|
send(state.instmap[inst].pid_to_inform, {:decision, inst, value})
|
|
end
|
|
|
|
%{
|
|
state
|
|
| decided: Map.put(state.decided, inst, value),
|
|
instmap: Map.delete(state.instmap, inst)
|
|
}
|
|
end
|
|
end
|
|
|
|
#######################
|
|
# Interface #
|
|
#######################
|
|
|
|
@doc """
|
|
Send the propose message to the paxos replica and waits for a response from the correct instance
|
|
"""
|
|
def propose(pid, inst, value, t, action \\ nil) do
|
|
send(pid, {:propose, inst, value, self(), action})
|
|
|
|
propose_loop(inst, t)
|
|
end
|
|
|
|
@doc """
|
|
Waits the right message from the paxos replica
|
|
"""
|
|
defp propose_loop(inst, t) do
|
|
receive do
|
|
{:timeout, ^inst} -> {:timeout}
|
|
{:abort, ^inst} -> {:abort}
|
|
{:decision, ^inst, d} -> {:decision, d}
|
|
x ->
|
|
Process.send_after(self(), x, 500)
|
|
propose_loop(inst, t)
|
|
after
|
|
t -> {:timeout}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Sends the :get_value message to the paxos replica
|
|
"""
|
|
def get_decision(pid, inst, t) do
|
|
send(pid, {:get_value, inst, self()})
|
|
get_decision_loop(inst, t)
|
|
end
|
|
|
|
@doc """
|
|
Sends waits for the right message from the paxos replica
|
|
"""
|
|
defp get_decision_loop(inst, t) do
|
|
receive do
|
|
{:get_value_res, ^inst, v} ->
|
|
v
|
|
|
|
x ->
|
|
Process.send_after(self(), x, 500)
|
|
get_decision_loop(inst, t)
|
|
after
|
|
t -> nil
|
|
end
|
|
end
|
|
end
|
|
|
|
defmodule Ballot do
|
|
@moduledoc """
|
|
A set of a helper functions to manage ballots
|
|
"""
|
|
|
|
@doc """
|
|
Create a new ballot
|
|
"""
|
|
def init(name, number \\ 0), do: {name, number}
|
|
|
|
@doc """
|
|
Increase the ballot number
|
|
"""
|
|
def inc(b, name \\ nil) do
|
|
{old_name, number} = b
|
|
|
|
{
|
|
if name == nil do old_name else name end,
|
|
number + 1
|
|
}
|
|
end
|
|
|
|
@doc """
|
|
Compare the name of 2 processes and select the lowest one
|
|
"""
|
|
defp lexicographical_compare(a, b) do
|
|
cond do
|
|
a == b -> 0
|
|
a > b -> 1
|
|
true -> -1
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Callculate the difference between w ballots
|
|
"""
|
|
defp diff({name1, number1}, {name2, number2}) do
|
|
diff = number1 - number2
|
|
if diff == 0 do
|
|
lexicographical_compare(name1, name2)
|
|
else
|
|
diff
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Compare 2 ballots
|
|
"""
|
|
def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0)
|
|
end
|
|
|
|
defmodule EagerReliableBroadcast do
|
|
@moduledoc """
|
|
This modules implents eager reiable broadcast with the optimization of removing the from the deliver list after hearing
|
|
the same number of echos as there is processes
|
|
|
|
emits {:rb_deliver, proc, message}
|
|
"""
|
|
|
|
require Utils
|
|
import Utils
|
|
|
|
@doc """
|
|
Removes _br from the name of a process
|
|
"""
|
|
defp get_non_rb_name(name) do
|
|
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
|
|
end
|
|
|
|
@doc """
|
|
Starts the Eager reliable process and registers it with a name and links the process with the parent process
|
|
"""
|
|
def start(name, processes) do
|
|
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
|
|
register_name(alter_name(name, "_rb"), pid)
|
|
end
|
|
|
|
@doc """
|
|
Sets state and calls the run function
|
|
"""
|
|
def init(parent, processes) do
|
|
state = %{
|
|
name: alter_name(parent, "_rb"),
|
|
parent: parent,
|
|
processes: Enum.map(processes, fn name -> alter_name(name, "_rb") end),
|
|
# Use this data structure to remember IDs of the delivered messages
|
|
delivered: %{},
|
|
# Use this variable to remember the last sequence number used to identify a message
|
|
seq_no: 0
|
|
}
|
|
|
|
run(state)
|
|
end
|
|
|
|
runfn do
|
|
# Handle the broadcast request event
|
|
{:broadcast, m} ->
|
|
data_msg = {:data, state.name, state.seq_no, m}
|
|
beb_broadcast(data_msg, state.processes)
|
|
%{state | seq_no: state.seq_no + 1}
|
|
|
|
{:data, proc, seq_no, m} ->
|
|
if not Map.has_key?(state.delivered, {proc, seq_no, m}) do
|
|
data_msg = {:data, proc, seq_no, m}
|
|
beb_broadcast(data_msg, state.processes)
|
|
|
|
safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m})
|
|
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)}
|
|
else
|
|
val = Map.get(state.delivered, {proc, seq_no, m})
|
|
|
|
if val < Enum.count(state.processes) do
|
|
%{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)}
|
|
else
|
|
%{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})}
|
|
end
|
|
end
|
|
end
|
|
|
|
#############
|
|
# Interface #
|
|
#############
|
|
@doc """
|
|
Sends a a broadcast request to the eager reliable broadcast replica
|
|
"""
|
|
def broadcast(name, m), do: safecast(alter_name(name, "_rb"), {:broadcast, m})
|
|
end
|
|
|
|
defmodule EventualLeaderElector do
|
|
@moduledoc """
|
|
This modules implents eventual leader elector
|
|
|
|
emits {:ele_leader, proc}
|
|
"""
|
|
require Utils
|
|
import Utils
|
|
|
|
@doc """
|
|
Initializes the leader elector process and registers is under name_ele and links it with the parent process
|
|
"""
|
|
def start(name, processes) do
|
|
new_name = alter_name(name, "_ele")
|
|
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
|
|
|
|
register_name(new_name, pid)
|
|
end
|
|
|
|
@doc """
|
|
Initializes state and starts the run function
|
|
"""
|
|
def init(name, parent, processes) do
|
|
processes = Enum.map(processes, fn name -> alter_name(name, "_ele") end)
|
|
|
|
state = %{
|
|
name: name,
|
|
parent: parent,
|
|
processes: processes,
|
|
timeout: 1000,
|
|
heard_back: MapSet.new(),
|
|
seq: 0,
|
|
last_trust: nil
|
|
}
|
|
|
|
run(request_heartbeats(state))
|
|
end
|
|
|
|
@doc """
|
|
Clears heard_back and updates the sequence number and then sends a request for heartbeats to all proccesses
|
|
"""
|
|
def request_heartbeats(state) do
|
|
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
|
|
beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
|
|
|
|
Process.send_after(self(), {:timeout}, state.timeout)
|
|
state
|
|
end
|
|
|
|
runfn do
|
|
# Answer to a heartbeat_request
|
|
{:heartbeat_request, name, seq} ->
|
|
safecast(name, {:heartbeat, state.parent, seq})
|
|
state
|
|
|
|
# Update heard_back with this name
|
|
{:heartbeat, name, seq} ->
|
|
or_state seq == state.seq do
|
|
%{state | heard_back: MapSet.put(state.heard_back, name)}
|
|
end
|
|
|
|
# One time out check the heard_back and send if a new leader is elected inform the parent
|
|
{:timeout} ->
|
|
state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do
|
|
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)
|
|
|
|
or_state state.last_trust != to_trust do
|
|
safecast(state.parent, {:ele_trust, to_trust})
|
|
%{state | last_trust: to_trust}
|
|
end
|
|
end
|
|
|
|
request_heartbeats(state)
|
|
end
|
|
end
|