diff --git a/lib/paxos.ex b/lib/paxos.ex index 8164ad6..d265710 100644 --- a/lib/paxos.ex +++ b/lib/paxos.ex @@ -1,11 +1,30 @@ +# +# This files contains some modules nessesary for paxos to work +# +# Utils: General Utilities +# Paxos: Paxos logic +# Ballot: Contain the logic to handle the ballot numbers +# EventualLeaderElector: Leader elector code +# EagerReliableBroadcast: Handles eager reliable broadcast +# + defmodule Utils do @moduledoc """ - This module contains some helpful functions are used throughout + This module contains some helpful functions are used throughout all modules + """ + + @doc """ + Sets the min log level + 0 - show all + 1 - ignore paxos + 2 - ignore server """ - - @min_print_level 3 + @doc """ + This function works similiary with unicast but it allows both for a pid or an atom + to be given as the first parameter + """ def safecast(p, m) when p == nil, do: IO.puts('Trying to safecast #{m} with p as nil') def safecast(p, m) when is_pid(p), do: send(p, m) def safecast(p, m) do @@ -15,9 +34,19 @@ defmodule Utils do end end + @doc """ + This function creates a new name for processes + """ def alter_name(name, part), do: :"#{name}#{part}" + + @doc """ + This function sends a message to a list of porcesses + """ def beb_broadcast(m, dest), do: for(p <- dest, do: safecast(p, m)) + @doc """ + This function register a pid in the global name space + """ def register_name(name, pid, link \\ true) do case :global.re_register_name(name, pid) do :yes -> @@ -33,7 +62,10 @@ defmodule Utils do :error end end - + + @doc """ + This macro defines a new log function for a specific log level + """ defmacro create_log(level) do quote do def log(msg) do @@ -41,13 +73,26 @@ defmodule Utils do end end end + + @doc """ + This function is used bu the create_log macro. + This function only prints logs if the level definied in the create_log macro + """ def _log(msg, level) do if (@min_print_level <= level) do IO.puts(msg) end end - + + @doc """ + This macro defines a an or_state function that acts like this: + if val do + expr + else + state + end + """ defmacro or_state(val, do: expr) do quote do case unquote(val) do @@ -56,7 +101,10 @@ defmodule Utils do end end end - + + @doc """ + This macro defines a macro to simplify the process of creating run functions + """ defmacro runfn(do: expr) do quote do def run(s) do @@ -69,37 +117,45 @@ defmodule Utils do end end -# -# -# Possible actions -# :kill_before_decision -# :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot -# this is usefull when forcing a nack -# defmodule Paxos do + @moduledoc """ + This module contains all the logic for Paxos + + To start a paxos instance run Paxos.start + + To propose a a value to paxos run Paxos.propose + + To get a previous decision please run Paxos.get_decision + """ require Utils import Utils create_log 0 - - defmacro set_instmap(do: expr) do + + @doc """ + This macro allows the state.instmap to be updated very easily + """ + defmacrop set_instmap(do: expr) do quote do var!(map) = var!(state).instmap[var!(inst)] new_instmap = Map.put(var!(state).instmap, var!(inst), unquote(expr)) var!(state) = %{var!(state) | instmap: new_instmap } end end - - # Starts the Paxos replica with a specific name and some processes + + @doc """ + Starts the Paxos replica with a specific name and some processes + """ def start(name, processes, link \\ false) do log("Starting paxos for #{name}") pid = spawn(Paxos, :init, [name, processes]) register_name(name, pid, link) end - - # Init event must be the first - # one after the component is created + + @doc """ + Inicializes the state starts the eager reliable broadcast, and starts the eventual leader eletector + """ def init(name, processes) do EventualLeaderElector.start(name, processes) EagerReliableBroadcast.start(name, processes) @@ -114,9 +170,11 @@ defmodule Paxos do run(state) end - - # Guarantees that a specific state exists - def has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do + + @doc """ + Guarantees that a specific state exists for a specific instance + """ + defp has_or_create(state, inst, value \\ nil, pid_to_inform \\ nil, action \\ nil) do or_state state.instmap[inst] == nil do instmap = Map.put(state.instmap, inst, %{ @@ -138,8 +196,13 @@ defmodule Paxos do %{state | instmap: instmap} end end - - def has_finished(state, inst, ignore_aborted \\ false) do + + @doc """ + Checks if an instance has finished or if it was aborted. + If the optional parameter ignore_aborted was set to true makes this function only check + if the the instance has finished + """ + defp has_finished(state, inst, ignore_aborted \\ false) do cond do Map.has_key?(state.decided, inst) -> true ignore_aborted -> false @@ -147,15 +210,32 @@ defmodule Paxos do true -> false end end - + + @doc """ + This is the run/recieve function + All the messages that are handled by this function are: + {:ele_trust, proc} -> + {:propose, inst, value, pid_to_inform, action} -> + {:rb_deliver, proc, {:other_propose, inst, value}} -> + {:rb_deliver, proc, {:prepare, proc, inst, ballot}} -> + {:nack, inst, ballot} -> + {:rb_deliver, _proc, {:abort, inst, ballot}} -> + {:prepared, inst, ballot, accepted_ballot, accepted_value} -> + {:rb_deliver, proc, {:accept, inst, ballot, value}} -> + {:accepted, inst, ballot} -> + {:get_value, inst, pid_to_inform} -> + {:rb_deliver, _, {:decide, inst, value}} -> + """ runfn do + # Handles leader elector {:ele_trust, proc} -> log("#{state.name} - #{proc} is leader") Enum.reduce(Map.keys(state.instmap), %{state | leader: proc}, fn inst, st -> prepare(st, inst) end) - + + # Handles a proposal from the parent process {:propose, inst, value, pid_to_inform, action} -> log("#{state.name} - Propose #{inspect(inst)} with action #{inspect(action)}") @@ -195,7 +275,8 @@ defmodule Paxos do EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value}) prepare(state, inst) end - + + # Handles the sharing of a proposal value to other processes {:rb_deliver, proc, {:other_propose, inst, value}} -> cond do has_finished(state, inst, true) -> @@ -212,7 +293,8 @@ defmodule Paxos do end prepare(state, inst) end - + + # Handles a prepare request from the leader {:rb_deliver, proc, {:prepare, proc, inst, ballot}} -> log("#{state.name} - prepare from #{proc}") @@ -243,7 +325,8 @@ defmodule Paxos do safecast(proc, {:nack, inst, ballot}) state end - + + # Handles a nack {:nack, inst, ballot} -> log("#{state.name} - nack #{inspect(inst)} #{inspect(ballot)}") @@ -270,6 +353,7 @@ defmodule Paxos do state end + # Handles an abort {:rb_deliver, _proc, {:abort, inst, ballot}} -> cond do has_finished(state, inst) -> @@ -285,6 +369,8 @@ defmodule Paxos do state end + # Handles a prepared + # Sends out accept when a quorum is met {:prepared, inst, ballot, accepted_ballot, accepted_value} -> log( "#{state.name} - prepared #{inspect(inst)} #{inspect(ballot)} #{inspect(accepted_ballot)} #{inspect(accepted_value)}" @@ -311,6 +397,7 @@ defmodule Paxos do state end + # Handles a accept {:rb_deliver, proc, {:accept, inst, ballot, value}} -> cond do has_finished(state, inst) -> @@ -338,6 +425,8 @@ defmodule Paxos do end end + # Handles a accept + # Sends out accept when a decide when a quoeom is met {:accepted, inst, ballot} -> log("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}") @@ -355,14 +444,17 @@ defmodule Paxos do true -> state end - + + # handles a parent request to get a value {:get_value, inst, pid_to_inform} -> # log("#{state.name} get_value") if has_finished(state, inst, true) do safecast(pid_to_inform, {:get_value_res, inst, state.decided[inst]}) end state - + + # Adds a decision made by a leader to the list of decisions + # and informs the parent of the decision {:rb_deliver, _, {:decide, inst, value}} -> log("#{state.name} - decided #{inspect(inst)} #{inspect(value)}") @@ -379,13 +471,12 @@ defmodule Paxos do end end - # - # Puts process in the preapre state - # - - def prepare(state, _) when state.leader != state.name, do: state - - def prepare(state, inst) do + @doc """ + Does the logic to decide when to send the prepare messages + Also sets the state nessesary to run the proposal + """ + defp prepare(state, _) when state.leader != state.name, do: state + defp prepare(state, inst) do cond do state.instmap[inst] == nil -> state @@ -417,12 +508,11 @@ defmodule Paxos do end end - # - # Process the prepared responses - # - def prepared(state, _) when state.leader != state.name, do: state - - def prepared(state, inst) do + @doc """ + Processes the prepared messages and when a quorum is met the accept messages are send + """ + defp prepared(state, _) when state.leader != state.name, do: state + defp prepared(state, inst) do if length(state.instmap[inst].prepared_values) >= floor(length(state.processes) / 2) + 1 and not state.instmap[inst].has_sent_accept do {_, a_val} = @@ -465,13 +555,12 @@ defmodule Paxos do end end - # - # Process the accepted responses - # - def accepted(state, _) when state.leader != state.name, do: state - - def accepted(state, inst) do - if state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do + @doc """ + Processes the accepted messages and when a qurum is met decide on the value + """ + defp accepted(state, _) when state.leader != state.name, do: state + defp accepted(state, inst) do + or_state state.instmap[inst].accepted >= floor(length(state.processes) / 2) + 1 do value = state.instmap[inst].ballot_value if state.instmap[inst].action == :kill_before_decision do @@ -493,69 +582,76 @@ defmodule Paxos do | decided: Map.put(state.decided, inst, value), instmap: Map.delete(state.instmap, inst) } - else - state end end + ####################### + # Interface # + ####################### + + @doc """ + Send the propose message to the paxos replica and waits for a response from the correct instance + """ def propose(pid, inst, value, t, action \\ nil) do send(pid, {:propose, inst, value, self(), action}) - propose_loop({inst, t}) + propose_loop(inst, t) end - - def propose_loop(input) do - {_, t} = input + + @doc """ + Waits the right message from the paxos replica + """ + defp propose_loop(inst, t) do receive do - {:timeout, inst} -> - check_and_apply({:timeout}, inst, input, &propose_loop/1) - - {:abort, inst} -> - check_and_apply({:abort}, inst, input, &propose_loop/1) - - {:decision, inst, d} -> - check_and_apply({:decision, d}, inst, input, &propose_loop/1) - + {:timeout, ^inst} -> {:timeout} + {:abort, ^inst} -> {:abort} + {:decision, ^inst, d} -> {:decision, d} x -> Process.send_after(self(), x, 500) - propose_loop(input) + propose_loop(inst, t) after t -> {:timeout} end end - + + @doc """ + Sends the :get_value message to the paxos replica + """ def get_decision(pid, inst, t) do send(pid, {:get_value, inst, self()}) - get_decision_loop({inst, t}) + get_decision_loop(inst, t) end - - def get_decision_loop(input) do - {_, t} = input + + @doc """ + Sends waits for the right message from the paxos replica + """ + defp get_decision_loop(inst, t) do receive do - {:get_value_res, inst, v} -> - check_and_apply(v, inst, input, &get_decision_loop/1) + {:get_value_res, ^inst, v} -> + v x -> Process.send_after(self(), x, 500) - get_decision_loop(input) + get_decision_loop(inst, t) after t -> nil end end - - def check_and_apply(v, inst, input, fun) do - {inInst, _} = input - if inst == inInst do - v - else - fun.(input) - end - end end defmodule Ballot do - def init(name, number \\ 0), do: {name, number} + @moduledoc """ + A set of a helper functions to manage ballots + """ + @doc """ + Create a new ballot + """ + def init(name, number \\ 0), do: {name, number} + + @doc """ + Increase the ballot number + """ def inc(b, name \\ nil) do {old_name, number} = b @@ -564,7 +660,10 @@ defmodule Ballot do number + 1 } end - + + @doc """ + Compare the name of 2 processes and select the lowest one + """ defp lexicographical_compare(a, b) do cond do a == b -> 0 @@ -573,7 +672,9 @@ defmodule Ballot do end end - + @doc """ + Callculate the difference between w ballots + """ defp diff({name1, number1}, {name2, number2}) do diff = number1 - number2 if diff == 0 do @@ -582,25 +683,42 @@ defmodule Ballot do diff end end - + + @doc """ + Compare 2 ballots + """ def compare(b1, operator, b2), do: operator.(diff(b1, b2), 0) end defmodule EagerReliableBroadcast do + @moduledoc """ + This modules implents eager reiable broadcast with the optimization of removing the from the deliver list after hearing + the same number of echos as there is processes + + emits {:rb_deliver, proc, message} + """ + require Utils import Utils - def get_non_rb_name(name) do + @doc """ + Removes _br from the name of a process + """ + defp get_non_rb_name(name) do String.to_atom(String.replace(Atom.to_string(name), "_rb", "")) end - + + @doc """ + Starts the Eager reliable process and registers it with a name and links the process with the parent process + """ def start(name, processes) do pid = spawn(EagerReliableBroadcast, :init, [name, processes]) register_name(alter_name(name, "_rb"), pid) end - # Init event must be the first - # one after the component is created + @doc """ + Sets state and calls the run function + """ def init(parent, processes) do state = %{ name: alter_name(parent, "_rb"), @@ -643,17 +761,24 @@ defmodule EagerReliableBroadcast do ############# # Interface # ############# + @doc """ + Sends a a broadcast request to the eager reliable broadcast replica + """ def broadcast(name, m), do: safecast(alter_name(name, "_rb"), {:broadcast, m}) end -# -# Emits {:ele_trust, proc } -# - defmodule EventualLeaderElector do + @moduledoc """ + This modules implents eventual leader elector + + emits {:ele_leader, proc} + """ require Utils import Utils + @doc """ + Initializes the leader elector process and registers is under name_ele and links it with the parent process + """ def start(name, processes) do new_name = alter_name(name, "_ele") pid = spawn(EventualLeaderElector, :init, [new_name, name, processes]) @@ -661,8 +786,9 @@ defmodule EventualLeaderElector do register_name(new_name, pid) end - # Init event must be the first - # one after the component is created + @doc """ + Initializes state and starts the run function + """ def init(name, parent, processes) do processes = Enum.map(processes, fn name -> alter_name(name, "_ele") end) @@ -678,7 +804,10 @@ defmodule EventualLeaderElector do run(request_heartbeats(state)) end - + + @doc """ + Clears heard_back and updates the sequence number and then sends a request for heartbeats to all proccesses + """ def request_heartbeats(state) do state = %{state | heard_back: MapSet.new(), seq: state.seq + 1} beb_broadcast({:heartbeat_request, state.name, state.seq}, state.processes) @@ -688,15 +817,18 @@ defmodule EventualLeaderElector do end runfn do + # Answer to a heartbeat_request {:heartbeat_request, name, seq} -> safecast(name, {:heartbeat, state.parent, seq}) state - + + # Update heard_back with this name {:heartbeat, name, seq} -> or_state seq == state.seq do %{state | heard_back: MapSet.put(state.heard_back, name)} end - + + # One time out check the heard_back and send if a new leader is elected inform the parent {:timeout} -> state = or_state MapSet.size(state.heard_back) >= floor(length(state.processes)/2) + 1 do to_trust = Enum.at(Enum.sort(MapSet.to_list(state.heard_back)), 0) diff --git a/lib/server.ex b/lib/server.ex index 8a8c335..ac85326 100644 --- a/lib/server.ex +++ b/lib/server.ex @@ -1,6 +1,13 @@ defmodule ServerMacros do - - def create_create_loop(name, do: match_exp, else: process_exp) do + @moduledoc """ + This module defines some helper macros that are used within the server macro + """ + + @doc """ + This macro creates a wait loop to wait for the messages that are receive match what is inside the + loop + """ + defmacro create_loop(name, do: match_exp) do function_name = :"#{name}_loop" ast1 = quote do @@ -19,7 +26,6 @@ defmodule ServerMacros do quote do defp unquote(function_name)(v, t) do var!(v) = v - unquote(process_exp) receive do unquote(ast3) after @@ -28,19 +34,11 @@ defmodule ServerMacros do end end end - - def create_create_loop(name, do: exp, else: else_exp) do - create_create_loop(name, do: exp, else: else_exp) - end - - def create_create_loop(name, do: exp) do - create_create_loop(name, do: exp, else: nil) - end - - defmacro create_loop(name, clauses) do - create_create_loop(name, clauses) - end - + + @doc """ + this function tries to propose and on failure it calls the else block and + on success tries to match with the expressions on the do block + """ defmacro try_propose(val, do: ready, else: recal_do) do ast1 = quote do {:timeout} -> unquote(recal_do) @@ -68,20 +66,29 @@ defmodule ServerMacros do end defmodule Server do + @moduledoc """ + Contains the to run the server Code + """ require ServerMacros import ServerMacros require Utils import Utils create_log 2 - + + @doc """ + Contains the start code for the server + """ def start(name, participants) do - log("starting server") + log("#{name} Starting server") pid = spawn(Server, :init, [name, participants]) register_name(name, pid, false) end - + + @doc """ + Initializes the state and starts the paxos inspect and then it calls the run fn + """ def init(name, participants) do paxos = Paxos.start(alter_name(name, "_paxos"), Enum.map(participants, fn name -> alter_name(name, "_paxos") end), true) state = %{ @@ -113,7 +120,10 @@ defmodule Server do try_to_play_checks(state, game_id, move, pid_to_inform) end - + + @doc """ + Checks if the user can play the move before starting the requesting the user to play + """ defp try_to_play_checks(state, game_id, move, pid_to_inform, repeat \\ false) do cond do state.games[game_id] == :not_playing_in_game -> @@ -154,7 +164,10 @@ defmodule Server do end end end - + + @doc """ + Tries to propose to paxos the game action + """ defp try_to_play(state, game_id, move, pid_to_inform) do name = state.name @@ -195,7 +208,10 @@ defmodule Server do try_to_play(state, game_id, move, pid_to_inform) end end - + + @doc """ + Get the most recent game_state and return it to the player + """ defp get_game_state(state, game_id, pid_to_inform, repeat \\ false) do cond do state.games[game_id] == :not_playing_in_game -> @@ -223,7 +239,10 @@ defmodule Server do state end end - + + @doc """ + This generates a new hand based on the current game state + """ defp get_hand_for_game_state(game_state) do r1 = Enum.random(0..100) cond do @@ -237,7 +256,10 @@ defmodule Server do Enum.random(mn..mx) end end - + + @doc """ + This tries to create a game by sending the create message to paxos + """ defp try_to_create_game(state, participants) do game_ids = Map.keys(state.games) latest = Enum.at(Enum.sort(game_ids), length(game_ids) - 1) @@ -259,14 +281,19 @@ defmodule Server do # # Utils # - + @doc """ + Checks if a game has been finished + """ defp is_finished(state, game) do case state.games[game] do {:finished, _} -> true _ -> false end end - + + @doc """ + Gets up to the most recent instance + """ defp qurey_status(state) do v = Paxos.get_decision(state.paxos, state.instance, 100) or_state v != nil do @@ -274,7 +301,10 @@ defmodule Server do qurey_status(state) end end - + + @doc """ + Sets the modified flag + """ defp set_modifed(state, game, val \\ false) do or_state not is_finished(state, game) and state.games[game] != :not_playing_in_game and state.games[game] != nil do %{state | games: Map.put(state.games, game, %{state.games[game] | modified: val})}