# # This files contains some modules nessesary for paxos to work # # Utils: General Utilities # Paxos: Paxos logic # Ballot: Contain the logic to handle the ballot numbers # EventualLeaderElector: Leader elector code # EagerReliableBroadcast: Handles eager reliable broadcast # defmodule Utils do @moduledoc """ This module contains some helpful functions are used throughout all modules """ @doc """ Sets the min log level 0 - show all 1 - ignore paxos 2 - ignore server """ @min_print_level 3 # This function works similiary with unicast but it allows both for a pid or an atom # to be given as the first parameter def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil') def safecast(p, m) when is_pid(p), do: send(p, m) def safecast(p, m) do case :global.whereis_name(p) do pid when is_pid(pid) -> send(pid, m) :undefined -> :ok end end @doc """ This function creates a new name for processes """ def alter_name(name, part), do: :"#{name}#{part}" @doc """ This function sends a message to a list of porcesses """ def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m)) @doc """ This function register a pid in the global name space """ def register_name(name, pid, link \\ true) do case :global.re_register_name(name, pid) do :yes -> # Note this is running on the parent so we are linking the parent to the rb # so that when we close the parent the rb also dies if link do Process.link(pid) end pid :no -> Process.exit(pid, :kill) :error end end @doc """ This macro defines a new log function for a specific log level """ defmacro create_log(level) do quote do def log(msg) do Utils._log(msg, unquote(level)) end end end @doc """ This function is used bu the create_log macro. This function only prints logs if the level definied in the create_log macro """ def _log(msg, level) do if (@min_print_level <= level) do IO.puts(msg) end end @doc """ This macro defines a an or_state function that acts like this: if val do expr else state end """ defmacro or_state(val, do: expr) do quote do case unquote(val) do true -> unquote(expr) _ -> var!(state) end end end @doc """ This macro defines a macro to simplify the process of creating run functions """ defmacro runfn(do: expr) do quote do def run(s) do var!(state) = s run(receive do unquote(expr) end) end end end end defmodule Paxos do @moduledoc """ This module contains all the logic for Paxos To start a paxos instance run Paxos.start To propose a a value to paxos run Paxos.propose To get a previous decision please run Paxos.get_decision """ require Utils import Utils create_log 0 # This macro allows the state.instmap to be updated very easily defmacrop set_instmap(do: expr) do quote do var!(map) = var!(state).instmap[var!(inst)] new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr)) var!(state) = %{var!(state) | instmap: new_instmap } end end @doc """ Starts the Paxos replica with a specific name and some processes """ def start(name, processes, link \\ false) do log("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, processes]) register_name(name, pid, link) end @doc """ Inicializes the state starts the eager reliable broadcast, and starts the eventual leader eletector """ def init(name, processes) do EventualLeaderElector.start(name, processes) EagerReliableBroadcast.start(name, processes) state = %{ name: name, processes: processes, leader: nil, instmap: %{}, decided: %{} } run(state) end # Guarantees that a specific state exists for a specific instance defp has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do or_state state.instmap[inst] == nil do instmap = Map.put(state.instmap, inst, %{ value: value, other_value: nil, ballot: Ballot.init(state.name, 0), aborted: false, ballot_value: nil, prepared_values: [], accepted: 0, accepted_ballot: nil, accepted_value: nil, pid_to_inform: pid_to_inform, has_sent_accept: false, action: action, has_sent_prepare: false, }) %{state | instmap: instmap} end end # Checks if an instance has finished or if it was aborted. # If the optional parameter ignore_aborted was set to true makes this function only check # if the the instance has finished defp has_finished(state, inst, ignore_aborted \\ false) do cond do Map.has_key?(state.decided, inst) -> true ignore_aborted -> false Map.has_key?(state.instmap, inst) -> state.instmap[inst].aborted true -> false end end # This is the run/recieve function # All the messages that are handled by this function are: # {:ele_trust, proc} -> # {:propose, inst, value, pid_to_inform, action} -> # {:rb_deliver, proc, {:other_propose, inst, value}} -> # {:rb_deliver, proc, {:prepare, proc, inst, ballot}} -> # {:nack, inst, ballot} -> # {:rb_deliver, _proc, {:abort, inst, ballot}} -> # {:prepared, inst, ballot, accepted_ballot, accepted_value} -> # {:rb_deliver, proc, {:accept, inst, ballot, value}} -> # {:accepted, inst, ballot} -> # {:get_value, inst, pid_to_inform} -> # {:rb_deliver, _, {:decide, inst, value}} -> runfn do # Handles leader elector {:ele_trust, proc} -> log("#{state.name} - #{proc} is leader") Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> prepare(st, inst) end) # Handles a proposal from the parent process {:propose, inst, value, pid_to_inform, action} -> log("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}") cond do has_finished(state, inst, true) -> log("#{state.name} - Has already decided for #{inspect(inst)} sending #{inspect(state.decided[inst])}") send(pid_to_inform, {:decision, inst, state.decided[inst]}) state action == :increase_ballot_number -> log("#{state.name} - Got request to increase ballot number for inst #{inst}") state = has_or_create(state, inst) set_instmap do %{map| ballot: Ballot.inc(map.ballot)} end state not Map.has_key?(state.instmap, inst) -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) state = has_or_create(state, inst, value, pid_to_inform, action) prepare(state, inst) state.instmap[inst].value == nil -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) set_instmap do %{ map | value: value, pid_to_inform: pid_to_inform, action: action, } end prepare(state, inst) true -> EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) prepare(state, inst) end # Handles the sharing of a proposal value to other processes {:rb_deliver, _, {:other_propose, inst, value}} -> cond do has_finished(state, inst, true) -> EagerReliableBroadcast.broadcast( state.name, {:decide, inst, state.decided[inst]} ) state true -> state = has_or_create(state, inst) set_instmap do %{map | other_value: value} end prepare(state, inst) end # Handles a prepare request from the leader {:rb_deliver, proc, {:prepare, proc, inst, ballot}} -> log("#{state.name} - prepare from #{proc}") cond do has_finished(state, inst) -> state not Map.has_key?(state.instmap, inst) -> state = has_or_create(state, inst) safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value}); set_instmap do %{ map | ballot: ballot } end state Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> safecast(proc, {:prepared, inst, ballot, state.instmap[inst].accepted_ballot, state.instmap[inst].accepted_value} ) set_instmap do %{ map | ballot: ballot } end state true -> safecast(proc, {:nack, inst, ballot}) state end # Handles a nack {:nack, inst, ballot} -> log("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}") cond do has_finished(state, inst) -> state state.leader == state.name and state.instmap[inst].ballot == ballot -> if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:abort, inst}) end EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot}) set_instmap do %{ map | has_sent_accept: false, has_sent_prepare: false, ballot: Ballot.inc(map.ballot), aborted: true, } end state true -> state end # Handles an abort {:rb_deliver, _proc, {:abort, inst, _}} -> cond do has_finished(state, inst) -> state true -> log("#{state.name} - got information to send abort") if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:abort, inst}) end state end # Handles a prepared # Sends out accept when a quorum is met {:prepared, inst, ballot, accepted_ballot, accepted_value} -> log( "#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}" ) cond do has_finished(state, inst) -> state ballot == state.instmap[inst].ballot -> set_instmap do %{ map | prepared_values: map.prepared_values ++ [{accepted_ballot, accepted_value}] } end prepared(state, inst) Ballot.compare(ballot, &>/2, state.instmap[inst].ballot) -> log("#{state.name} - Probably recieved this before preare came to self sending with delay") Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100) state true -> state end # Handles a accept {:rb_deliver, proc, {:accept, inst, ballot, value}} -> cond do has_finished(state, inst) -> state true -> state = has_or_create(state, inst) if Ballot.compare(ballot, &>=/2, state.instmap[inst].ballot) do log("#{state.name} - accept #{inspect(inst)} #{inspect(ballot)} #{inspect(value)}") safecast(proc, {:accepted, inst, ballot}) set_instmap do %{ map | ballot: ballot, accepted_value: value, accepted_ballot: ballot } end state else log("#{state.name} -> #{proc} nack") safecast(proc, {:nack, inst, ballot}) state end end # Handles a accept # Sends out accept when a decide when a quoeom is met {:accepted, inst, ballot} -> log("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}") cond do has_finished(state, inst) -> state state.leader == state.name and state.instmap[inst].ballot == ballot -> set_instmap do %{ map | accepted: map.accepted + 1 } end accepted( state, inst) true -> state end # handles a parent request to get a value {:get_value, inst, pid_to_inform} -> # log("#{state.name} get_value") if has_finished(state, inst, true) do safecast(pid_to_inform, {:get_value_res, inst, state.decided[inst]}) end state # Adds a decision made by a leader to the list of decisions # and informs the parent of the decision {:rb_deliver, _, {:decide, inst, value}} -> log("#{state.name} - decided #{inspect(inst)} #{inspect(value)}") or_state not has_finished(state, inst) do if Map.has_key?(state.instmap, inst) != nil and state.instmap[inst].pid_to_inform != nil do safecast(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } end end # Does the logic to decide when to send the prepare messages # Also sets the state nessesary to run the proposal defp prepare(state, _) when state.leader != state.name, do: state defp prepare(state, inst) do cond do state.instmap[inst] == nil -> state state.instmap[inst].value == nil and state.instmap[inst].other_value == nil -> state state.instmap[inst] != nil and state.instmap[inst].has_sent_prepare -> state state.instmap[inst] != nil and state.instmap[inst].has_sent_accept -> state true -> ballot = Ballot.inc(state.instmap[inst].ballot) log("#{state.name} - sending all prepare #{inst} #{inspect(ballot)}") EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot}) set_instmap do %{ map | prepared_values: [], accepted: 0, aborted: false, ballot_value: nil, has_sent_prepare: true, has_sent_accept: false } end state end end # Processes the prepared messages and when a quorum is met the accept messages are send defp prepared(state, _) when state.leader != state.name, do: state defp prepared(state, inst) do or_state length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and not state.instmap[inst].has_sent_accept do {_, a_val} = Enum.reduce(state.instmap[inst].prepared_values, {Ballot.init(state.name, 0), nil}, fn {bal, val}, {acc_bal, acc_val} -> cond do val == nil -> {acc_bal, acc_val} Ballot.compare(acc_bal, &>/2, bal) -> {acc_bal, acc_val} true -> {bal, val} end end) a_val = if a_val == nil do if state.instmap[inst].value == nil do state.instmap[inst].other_value else state.instmap[inst].value end else a_val end EagerReliableBroadcast.broadcast( state.name, {:accept, inst, state.instmap[inst].ballot, a_val} ) set_instmap do %{ map | ballot_value: a_val, has_sent_accept: true } end state end end # Processes the accepted messages and when a qurum is met decide on the value defp accepted(state, _) when state.leader != state.name, do: state defp accepted(state, inst) do or_state state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do value = state.instmap[inst].ballot_value if state.instmap[inst].action == :kill_before_decision do log("#{state.name} - Leader has action to die before decision #{inspect({:decide, inst, value})}") Process.exit(self(), :kill) end EagerReliableBroadcast.broadcast( state.name, {:decide, inst, value} ) if state.instmap[inst].pid_to_inform != nil do send(state.instmap[inst].pid_to_inform, {:decision, inst, value}) end %{ state | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } end end ####################### # Interface # ####################### @doc """ Send the propose message to the paxos replica and waits for a response from the correct instance """ def propose(pid, inst, value, t, action \\ nil) do send(pid, {:propose, inst, value, self(), action}) propose_loop(inst, t) end # Waits the right message from the paxos replica defp propose_loop(inst, t) do receive do {:timeout, ^inst} -> {:timeout} {:abort, ^inst} -> {:abort} {:decision, ^inst, d} -> {:decision, d} x -> Process.send_after(self(), x, 500) propose_loop(inst, t) after t -> {:timeout} end end @doc """ Sends the :get_value message to the paxos replica """ def get_decision(pid, inst, t) do send(pid, {:get_value, inst, self()}) get_decision_loop(inst, t) end # Sends waits for the right message from the paxos replica defp get_decision_loop(inst, t) do receive do {:get_value_res, ^inst, v} -> v x -> Process.send_after(self(), x, 500) get_decision_loop(inst, t) after t -> nil end end end defmodule Ballot do @moduledoc """ A set of a helper functions to manage ballots """ @doc """ Create a new ballot """ def init(name, number \\ 0), do: {name, number} @doc """ Increase the ballot number """ def inc(b, name \\ nil) do {old_name, number} = b { if name == nil do old_name else name end, number + 1 } end # Compare the name of 2 processes and select the lowest one defp lexicographical_compare(a, b) do cond do a == b -> 0 a > b -> 1 true -> -1 end end # Callculate the difference between w ballots defp diff({name1, number1}, {name2, number2}) do diff = number1 - number2 if diff == 0 do lexicographical_compare(name1, name2) else diff end end @doc """ Compare 2 ballots """ def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0) end defmodule EagerReliableBroadcast do @moduledoc """ This modules implents eager reiable broadcast with the optimization of removing the from the deliver list after hearing the same number of echos as there is processes emits {:rb_deliver, proc, message} """ require Utils import Utils # Removes _br from the name of a process defp get_non_rb_name(name) do String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) end @doc """ Starts the Eager reliable process and registers it with a name and links the process with the parent process """ def start(name, processes) do pid = spawn(EagerReliableBroadcast, :init, [name, processes]) register_name(alter_name(name, "_rb"), pid) end @doc """ Sets state and calls the run function """ def init(parent, processes) do state = %{ name: alter_name(parent, "_rb"), parent: parent, processes: Enum.map(processes, fn name -> alter_name(name, "_rb") end), # Use this data structure to remember IDs of the delivered messages delivered: %{}, # Use this variable to remember the last sequence number used to identify a message seq_no: 0 } run(state) end runfn do # Handle the broadcast request event {:broadcast, m} -> data_msg = {:data, state.name, state.seq_no, m} beb_broadcast(data_msg, state.processes) %{state | seq_no: state.seq_no + 1} {:data, proc, seq_no, m} -> if not Map.has_key?(state.delivered, {proc, seq_no, m}) do data_msg = {:data, proc, seq_no, m} beb_broadcast(data_msg, state.processes) safecast(state.parent, {:rb_deliver, get_non_rb_name(proc), m}) %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} else val = Map.get(state.delivered, {proc, seq_no, m}) if val < Enum.count(state.processes) do %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} else %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} end end end ############# # Interface # ############# @doc """ Sends a a broadcast request to the eager reliable broadcast replica """ def broadcast(name, m), do: safecast(alter_name(name, "_rb"), {:broadcast, m}) end defmodule EventualLeaderElector do @moduledoc """ This modules implents eventual leader elector emits {:ele_leader, proc} """ require Utils import Utils @doc """ Initializes the leader elector process and registers is under name_ele and links it with the parent process """ def start(name, processes) do new_name = alter_name(name, "_ele") pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) register_name(new_name, pid) end @doc """ Initializes state and starts the run function """ def init(name, parent, processes) do processes = Enum.map(processes, fn name -> alter_name(name, "_ele") end) state = %{ name: name, parent: parent, processes: processes, timeout: 1000, heard_back: MapSet.new(), seq: 0, last_trust: nil } run(request_heartbeats(state)) end @doc """ Clears heard_back and updates the sequence number and then sends a request for heartbeats to all proccesses """ def request_heartbeats(state) do state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) Process.send_after(self(), {:timeout}, state.timeout) state end runfn do # Answer to a heartbeat_request {:heartbeat_request, name, seq} -> safecast(name, {:heartbeat, state.parent, seq}) state # Update heard_back with this name {:heartbeat, name, seq} -> or_state seq == state.seq do %{state | heard_back: MapSet.put(state.heard_back, name)} end # One time out check the heard_back and send if a new leader is elected inform the parent {:timeout} -> state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) or_state state.last_trust != to_trust do safecast(state.parent, {:ele_trust, to_trust}) %{state | last_trust: to_trust} end end request_heartbeats(state) end end