added some docs
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-01-19 18:23:08 +00:00
parent a22005c868
commit 8c674626f8
2 changed files with 291 additions and 129 deletions

View File

@@ -1,11 +1,30 @@
#
# This files contains some modules nessesary for paxos to work
#
# Utils: General Utilities
# Paxos: Paxos logic
# Ballot: Contain the logic to handle the ballot numbers
# EventualLeaderElector: Leader elector code
# EagerReliableBroadcast: Handles eager reliable broadcast
#
defmodule Utils do
@moduledoc """
This module contains some helpful functions are used throughout
This module contains some helpful functions are used throughout all modules
"""
@doc """
Sets the min log level
0 - show all
1 - ignore paxos
2 - ignore server
"""
@min_print_level 3
@doc """
This function works similiary with unicast but it allows both for a pid or an atom
to be given as the first parameter
"""
def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil')
def safecast(p, m) when is_pid(p), do: send(p, m)
def safecast(p, m) do
@@ -15,9 +34,19 @@ defmodule Utils do
end
end
@doc """
This function creates a new name for processes
"""
def alter_name(name, part), do: :"#{name}#{part}"
@doc """
This function sends a message to a list of porcesses
"""
def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m))
@doc """
This function register a pid in the global name space
"""
def register_name(name, pid, link \\ true) do
case :global.re_register_name(name, pid) do
:yes ->
@@ -33,7 +62,10 @@ defmodule Utils do
:error
end
end
@doc """
This macro defines a new log function for a specific log level
"""
defmacro create_log(level) do
quote do
def log(msg) do
@@ -41,13 +73,26 @@ defmodule Utils do
end
end
end
@doc """
This function is used bu the create_log macro.
This function only prints logs if the level definied in the create_log macro
"""
def _log(msg, level) do
if (@min_print_level <= level) do
IO.puts(msg)
end
end
@doc """
This macro defines a an or_state function that acts like this:
if val do
expr
else
state
end
"""
defmacro or_state(val, do: expr) do
quote do
case unquote(val) do
@@ -56,7 +101,10 @@ defmodule Utils do
end
end
end
@doc """
This macro defines a macro to simplify the process of creating run functions
"""
defmacro runfn(do: expr) do
quote do
def run(s) do
@@ -69,37 +117,45 @@ defmodule Utils do
end
end
#
#
# Possible actions
# :kill_before_decision
# :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot
# this is usefull when forcing a nack
#
defmodule Paxos do
@moduledoc """
This module contains all the logic for Paxos
To start a paxos instance run Paxos.start
To propose a a value to paxos run Paxos.propose
To get a previous decision please run Paxos.get_decision
"""
require Utils
import Utils
create_log 0
defmacro set_instmap(do: expr) do
@doc """
This macro allows the state.instmap to be updated very easily
"""
defmacrop set_instmap(do: expr) do
quote do
var!(map) = var!(state).instmap[var!(inst)]
new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr))
var!(state) = %{var!(state) | instmap: new_instmap }
end
end
# Starts the Paxos replica with a specific name and some processes
@doc """
Starts the Paxos replica with a specific name and some processes
"""
def start(name, processes, link \\ false) do
log("Starting paxos for #{name}")
pid = spawn(Paxos, :init, [name, processes])
register_name(name, pid, link)
end
# Init event must be the first
# one after the component is created
@doc """
Inicializes the state starts the eager reliable broadcast, and starts the eventual leader eletector
"""
def init(name, processes) do
EventualLeaderElector.start(name, processes)
EagerReliableBroadcast.start(name, processes)
@@ -114,9 +170,11 @@ defmodule Paxos do
run(state)
end
# Guarantees that a specific state exists
def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
@doc """
Guarantees that a specific state exists for a specific instance
"""
defp has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do
or_state state.instmap[inst] == nil do
instmap =
Map.put(state.instmap, inst, %{
@@ -138,8 +196,13 @@ defmodule Paxos do
%{state | instmap: instmap}
end
end
def has_finished(state, inst, ignore_aborted \\ false) do
@doc """
Checks if an instance has finished or if it was aborted.
If the optional parameter ignore_aborted was set to true makes this function only check
if the the instance has finished
"""
defp has_finished(state, inst, ignore_aborted \\ false) do
cond do
Map.has_key?(state.decided, inst) -> true
ignore_aborted -> false
@@ -147,15 +210,32 @@ defmodule Paxos do
true -> false
end
end
@doc """
This is the run/recieve function
All the messages that are handled by this function are:
{:ele_trust, proc} ->
{:propose, inst, value, pid_to_inform, action} ->
{:rb_deliver, proc, {:other_propose, inst, value}} ->
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
{:nack, inst, ballot} ->
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
{:accepted, inst, ballot} ->
{:get_value, inst, pid_to_inform} ->
{:rb_deliver, _, {:decide, inst, value}} ->
"""
runfn do
# Handles leader elector
{:ele_trust, proc} ->
log("#{state.name} - #{proc} is leader")
Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st ->
prepare(st, inst)
end)
# Handles a proposal from the parent process
{:propose, inst, value, pid_to_inform, action} ->
log("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}")
@@ -195,7 +275,8 @@ defmodule Paxos do
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
prepare(state, inst)
end
# Handles the sharing of a proposal value to other processes
{:rb_deliver, proc, {:other_propose, inst, value}} ->
cond do
has_finished(state, inst, true) ->
@@ -212,7 +293,8 @@ defmodule Paxos do
end
prepare(state, inst)
end
# Handles a prepare request from the leader
{:rb_deliver, proc, {:prepare, proc, inst, ballot}} ->
log("#{state.name} - prepare from #{proc}")
@@ -243,7 +325,8 @@ defmodule Paxos do
safecast(proc, {:nack, inst, ballot})
state
end
# Handles a nack
{:nack, inst, ballot} ->
log("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}")
@@ -270,6 +353,7 @@ defmodule Paxos do
state
end
# Handles an abort
{:rb_deliver, _proc, {:abort, inst, ballot}} ->
cond do
has_finished(state, inst) ->
@@ -285,6 +369,8 @@ defmodule Paxos do
state
end
# Handles a prepared
# Sends out accept when a quorum is met
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
log(
"#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}"
@@ -311,6 +397,7 @@ defmodule Paxos do
state
end
# Handles a accept
{:rb_deliver, proc, {:accept, inst, ballot, value}} ->
cond do
has_finished(state, inst) ->
@@ -338,6 +425,8 @@ defmodule Paxos do
end
end
# Handles a accept
# Sends out accept when a decide when a quoeom is met
{:accepted, inst, ballot} ->
log("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}")
@@ -355,14 +444,17 @@ defmodule Paxos do
true ->
state
end
# handles a parent request to get a value
{:get_value, inst, pid_to_inform} ->
# log("#{state.name} get_value")
if has_finished(state, inst, true) do
safecast(pid_to_inform, {:get_value_res, inst, state.decided[inst]})
end
state
# Adds a decision made by a leader to the list of decisions
# and informs the parent of the decision
{:rb_deliver, _, {:decide, inst, value}} ->
log("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
@@ -379,13 +471,12 @@ defmodule Paxos do
end
end
#
# Puts process in the preapre state
#
def prepare(state, _) when state.leader != state.name, do: state
def prepare(state, inst) do
@doc """
Does the logic to decide when to send the prepare messages
Also sets the state nessesary to run the proposal
"""
defp prepare(state, _) when state.leader != state.name, do: state
defp prepare(state, inst) do
cond do
state.instmap[inst] == nil ->
state
@@ -417,12 +508,11 @@ defmodule Paxos do
end
end
#
# Process the prepared responses
#
def prepared(state, _) when state.leader != state.name, do: state
def prepared(state, inst) do
@doc """
Processes the prepared messages and when a quorum is met the accept messages are send
"""
defp prepared(state, _) when state.leader != state.name, do: state
defp prepared(state, inst) do
if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and
not state.instmap[inst].has_sent_accept do
{_, a_val} =
@@ -465,13 +555,12 @@ defmodule Paxos do
end
end
#
# Process the accepted responses
#
def accepted(state, _) when state.leader != state.name, do: state
def accepted(state, inst) do
if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do
@doc """
Processes the accepted messages and when a qurum is met decide on the value
"""
defp accepted(state, _) when state.leader != state.name, do: state
defp accepted(state, inst) do
or_state state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do
value = state.instmap[inst].ballot_value
if state.instmap[inst].action == :kill_before_decision do
@@ -493,69 +582,76 @@ defmodule Paxos do
| decided: Map.put(state.decided, inst, value),
instmap: Map.delete(state.instmap, inst)
}
else
state
end
end
#######################
# Interface #
#######################
@doc """
Send the propose message to the paxos replica and waits for a response from the correct instance
"""
def propose(pid, inst, value, t, action \\ nil) do
send(pid, {:propose, inst, value, self(), action})
propose_loop({inst, t})
propose_loop(inst, t)
end
def propose_loop(input) do
{_, t} = input
@doc """
Waits the right message from the paxos replica
"""
defp propose_loop(inst, t) do
receive do
{:timeout, inst} ->
check_and_apply({:timeout}, inst, input, &propose_loop/1)
{:abort, inst} ->
check_and_apply({:abort}, inst, input, &propose_loop/1)
{:decision, inst, d} ->
check_and_apply({:decision, d}, inst, input, &propose_loop/1)
{:timeout, ^inst} -> {:timeout}
{:abort, ^inst} -> {:abort}
{:decision, ^inst, d} -> {:decision, d}
x ->
Process.send_after(self(), x, 500)
propose_loop(input)
propose_loop(inst, t)
after
t -> {:timeout}
end
end
@doc """
Sends the :get_value message to the paxos replica
"""
def get_decision(pid, inst, t) do
send(pid, {:get_value, inst, self()})
get_decision_loop({inst, t})
get_decision_loop(inst, t)
end
def get_decision_loop(input) do
{_, t} = input
@doc """
Sends waits for the right message from the paxos replica
"""
defp get_decision_loop(inst, t) do
receive do
{:get_value_res, inst, v} ->
check_and_apply(v, inst, input, &get_decision_loop/1)
{:get_value_res, ^inst, v} ->
v
x ->
Process.send_after(self(), x, 500)
get_decision_loop(input)
get_decision_loop(inst, t)
after
t -> nil
end
end
def check_and_apply(v, inst, input, fun) do
{inInst, _} = input
if inst == inInst do
v
else
fun.(input)
end
end
end
defmodule Ballot do
def init(name, number \\ 0), do: {name, number}
@moduledoc """
A set of a helper functions to manage ballots
"""
@doc """
Create a new ballot
"""
def init(name, number \\ 0), do: {name, number}
@doc """
Increase the ballot number
"""
def inc(b, name \\ nil) do
{old_name, number} = b
@@ -564,7 +660,10 @@ defmodule Ballot do
number + 1
}
end
@doc """
Compare the name of 2 processes and select the lowest one
"""
defp lexicographical_compare(a, b) do
cond do
a == b -> 0
@@ -573,7 +672,9 @@ defmodule Ballot do
end
end
@doc """
Callculate the difference between w ballots
"""
defp diff({name1, number1}, {name2, number2}) do
diff = number1 - number2
if diff == 0 do
@@ -582,25 +683,42 @@ defmodule Ballot do
diff
end
end
@doc """
Compare 2 ballots
"""
def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0)
end
defmodule EagerReliableBroadcast do
@moduledoc """
This modules implents eager reiable broadcast with the optimization of removing the from the deliver list after hearing
the same number of echos as there is processes
emits {:rb_deliver, proc, message}
"""
require Utils
import Utils
def get_non_rb_name(name) do
@doc """
Removes _br from the name of a process
"""
defp get_non_rb_name(name) do
String.to_atom(String.replace(Atom.to_string(name), "_rb", ""))
end
@doc """
Starts the Eager reliable process and registers it with a name and links the process with the parent process
"""
def start(name, processes) do
pid = spawn(EagerReliableBroadcast, :init, [name, processes])
register_name(alter_name(name, "_rb"), pid)
end
# Init event must be the first
# one after the component is created
@doc """
Sets state and calls the run function
"""
def init(parent, processes) do
state = %{
name: alter_name(parent, "_rb"),
@@ -643,17 +761,24 @@ defmodule EagerReliableBroadcast do
#############
# Interface #
#############
@doc """
Sends a a broadcast request to the eager reliable broadcast replica
"""
def broadcast(name, m), do: safecast(alter_name(name, "_rb"), {:broadcast, m})
end
#
# Emits {:ele_trust, proc }
#
defmodule EventualLeaderElector do
@moduledoc """
This modules implents eventual leader elector
emits {:ele_leader, proc}
"""
require Utils
import Utils
@doc """
Initializes the leader elector process and registers is under name_ele and links it with the parent process
"""
def start(name, processes) do
new_name = alter_name(name, "_ele")
pid = spawn(EventualLeaderElector, :init, [new_name, name, processes])
@@ -661,8 +786,9 @@ defmodule EventualLeaderElector do
register_name(new_name, pid)
end
# Init event must be the first
# one after the component is created
@doc """
Initializes state and starts the run function
"""
def init(name, parent, processes) do
processes = Enum.map(processes, fn name -> alter_name(name, "_ele") end)
@@ -678,7 +804,10 @@ defmodule EventualLeaderElector do
run(request_heartbeats(state))
end
@doc """
Clears heard_back and updates the sequence number and then sends a request for heartbeats to all proccesses
"""
def request_heartbeats(state) do
state = %{state | heard_back: MapSet.new(), seq: state.seq + 1}
beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes)
@@ -688,15 +817,18 @@ defmodule EventualLeaderElector do
end
runfn do
# Answer to a heartbeat_request
{:heartbeat_request, name, seq} ->
safecast(name, {:heartbeat, state.parent, seq})
state
# Update heard_back with this name
{:heartbeat, name, seq} ->
or_state seq == state.seq do
%{state | heard_back: MapSet.put(state.heard_back, name)}
end
# One time out check the heard_back and send if a new leader is elected inform the parent
{:timeout} ->
state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do
to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0)