Added more tests and updated the presentation
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
330b49a7ca
commit
182faf18d6
39
lib/paxos.ex
39
lib/paxos.ex
@ -1,3 +1,12 @@
|
|||||||
|
#
|
||||||
|
#
|
||||||
|
# Possible actions
|
||||||
|
# :kill_before_decision
|
||||||
|
# :increase_ballot_number - this makes it so that it does not propose but jump simply increases the number of the current ballot
|
||||||
|
# this is usefull when forcing a nack
|
||||||
|
#
|
||||||
|
|
||||||
|
|
||||||
defmodule Paxos do
|
defmodule Paxos do
|
||||||
def start(name, processes) do
|
def start(name, processes) do
|
||||||
IO.puts("Starting paxos for #{name}")
|
IO.puts("Starting paxos for #{name}")
|
||||||
@ -73,6 +82,16 @@ defmodule Paxos do
|
|||||||
send(pid_to_inform, {:decision, inst, state.decided[inst]})
|
send(pid_to_inform, {:decision, inst, state.decided[inst]})
|
||||||
state
|
state
|
||||||
|
|
||||||
|
action == :increase_ballot_number ->
|
||||||
|
state = has_or_create(state, inst)
|
||||||
|
|
||||||
|
IO.puts("#{state.name} - Got request to increase ballot number for inst #{inst}")
|
||||||
|
|
||||||
|
# Inform the pid with timeout right way
|
||||||
|
send(pid_to_inform, {:timeout, inst});
|
||||||
|
|
||||||
|
set_instmap(state, inst, fn map -> %{map| ballot: map.ballot + 1} end)
|
||||||
|
|
||||||
not Map.has_key?(state.instmap, inst) ->
|
not Map.has_key?(state.instmap, inst) ->
|
||||||
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
||||||
state = has_or_create(state, inst, value, pid_to_inform, action)
|
state = has_or_create(state, inst, value, pid_to_inform, action)
|
||||||
@ -94,6 +113,7 @@ defmodule Paxos do
|
|||||||
)
|
)
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
|
EagerReliableBroadcast.broadcast(state.name, {:other_propose, inst, value})
|
||||||
prepare(state, inst)
|
prepare(state, inst)
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -101,7 +121,7 @@ defmodule Paxos do
|
|||||||
state = %{state | other_values: Map.put(state.other_values, inst, value)}
|
state = %{state | other_values: Map.put(state.other_values, inst, value)}
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
Map.has_key?(state.decided, inst) ->
|
has_finished(state, inst) ->
|
||||||
EagerReliableBroadcast.broadcast(
|
EagerReliableBroadcast.broadcast(
|
||||||
state.name,
|
state.name,
|
||||||
{:decide, inst, state.decided[inst]}
|
{:decide, inst, state.decided[inst]}
|
||||||
@ -170,7 +190,7 @@ defmodule Paxos do
|
|||||||
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
||||||
end
|
end
|
||||||
|
|
||||||
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot, new_ballot})
|
EagerReliableBroadcast.broadcast(state.name, {:abort, inst, ballot})
|
||||||
|
|
||||||
set_instmap(state, inst, fn map -> %{
|
set_instmap(state, inst, fn map -> %{
|
||||||
map | has_sent_accept: false,
|
map | has_sent_accept: false,
|
||||||
@ -187,16 +207,15 @@ defmodule Paxos do
|
|||||||
has_finished(state, inst) ->
|
has_finished(state, inst) ->
|
||||||
state
|
state
|
||||||
|
|
||||||
state.instmap[inst].ballot == ballot ->
|
true ->
|
||||||
|
IO.puts("#{state.name} - got information to send abort")
|
||||||
|
|
||||||
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
if Map.has_key?(state.instmap, inst) and state.instmap[inst].pid_to_inform != nil do
|
||||||
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
send(state.instmap[inst].pid_to_inform, {:abort, inst})
|
||||||
end
|
end
|
||||||
|
|
||||||
state
|
state
|
||||||
|
|
||||||
true ->
|
|
||||||
state
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
|
{:prepared, inst, ballot, accepted_ballot, accepted_value} ->
|
||||||
@ -218,7 +237,7 @@ defmodule Paxos do
|
|||||||
prepared(state, inst)
|
prepared(state, inst)
|
||||||
|
|
||||||
ballot > state.instmap[inst].ballot ->
|
ballot > state.instmap[inst].ballot ->
|
||||||
IO.puts("Probably recieved this before preare came to self sending with delay")
|
IO.puts("#{state.name} - Probably recieved this before preare came to self sending with delay")
|
||||||
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
|
Process.send_after(self(), {:prepared, inst, ballot, accepted_ballot, accepted_value}, 100)
|
||||||
state
|
state
|
||||||
|
|
||||||
@ -253,7 +272,7 @@ defmodule Paxos do
|
|||||||
end
|
end
|
||||||
|
|
||||||
{:accepted, inst, ballot} ->
|
{:accepted, inst, ballot} ->
|
||||||
IO.puts("#{state.name} accepted #{inspect(inst)} #{inspect(ballot)}")
|
IO.puts("#{state.name} - accepted #{inspect(inst)} #{inspect(ballot)}")
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
has_finished(state, inst) ->
|
has_finished(state, inst) ->
|
||||||
@ -289,7 +308,7 @@ defmodule Paxos do
|
|||||||
state
|
state
|
||||||
|
|
||||||
{:rb_deliver, _, {:decide, inst, value}} ->
|
{:rb_deliver, _, {:decide, inst, value}} ->
|
||||||
IO.puts("#{state.name} decided #{inspect(inst)} #{inspect(value)}")
|
IO.puts("#{state.name} - decided #{inspect(inst)} #{inspect(value)}")
|
||||||
|
|
||||||
if has_finished(state, inst) do
|
if has_finished(state, inst) do
|
||||||
state
|
state
|
||||||
@ -333,7 +352,7 @@ defmodule Paxos do
|
|||||||
|
|
||||||
true ->
|
true ->
|
||||||
ballot = state.instmap[inst].ballot + 1
|
ballot = state.instmap[inst].ballot + 1
|
||||||
IO.puts("#{state.name} sending all prepare #{inst} #{ballot}")
|
IO.puts("#{state.name} - sending all prepare #{inst} #{ballot}")
|
||||||
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
|
EagerReliableBroadcast.broadcast(state.name, {:prepare, state.name, inst, ballot})
|
||||||
|
|
||||||
set_instmap(state, inst, fn map -> %{
|
set_instmap(state, inst, fn map -> %{
|
||||||
|
@ -58,5 +58,154 @@ defmodule PaxosTestAditional do
|
|||||||
send(cpid, {:finished, name, pid, status, val, a, ql})
|
send(cpid, {:finished, name, pid, status, val, a, ql})
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
def run_leader_should_nack_simple(name, participants, val) do
|
||||||
|
{cpid, pid} = PaxosTest.init(name, participants, true)
|
||||||
|
send(cpid, :ready)
|
||||||
|
|
||||||
|
{status, val, a, spare} =
|
||||||
|
try do
|
||||||
|
receive do
|
||||||
|
:start ->
|
||||||
|
IO.puts("#{inspect(name)}: started")
|
||||||
|
|
||||||
|
[leader | spare] = Enum.sort(participants)
|
||||||
|
increase_ballot = Enum.take(spare, floor(length(participants) / 2))
|
||||||
|
|
||||||
|
if name == leader do
|
||||||
|
# Propose when passed with :kill_before_decision will die right before a decision is selected
|
||||||
|
Process.sleep(2000)
|
||||||
|
res = Paxos.propose(pid, 1, val, 10000)
|
||||||
|
|
||||||
|
if res != {:abort} do
|
||||||
|
IO.puts("#{name}: Leader failed to abort")
|
||||||
|
{:failed_to_abort, :none, 10, []}
|
||||||
|
else
|
||||||
|
Paxos.propose(pid, 1, val, 10000)
|
||||||
|
|
||||||
|
{status, val} = PaxosTest.wait_for_decision(pid, 1, 10000)
|
||||||
|
|
||||||
|
if status != :none,
|
||||||
|
do: IO.puts("#{name}: decided #{inspect(val)}"),
|
||||||
|
else: IO.puts("#{name}: No decision after 10 seconds")
|
||||||
|
|
||||||
|
{status, val, 10, participants}
|
||||||
|
end
|
||||||
|
else
|
||||||
|
if name in increase_ballot do
|
||||||
|
Process.sleep(10)
|
||||||
|
# Force the non leader process to have a higher ballot
|
||||||
|
Paxos.propose(pid, 1, nil, 1000, :increase_ballot_number)
|
||||||
|
end
|
||||||
|
|
||||||
|
{status, val} = PaxosTest.wait_for_decision(pid, 1, 10000)
|
||||||
|
|
||||||
|
if status != :none,
|
||||||
|
do: IO.puts("#{name}: decided #{inspect(val)}"),
|
||||||
|
else: IO.puts("#{name}: No decision after 10 seconds")
|
||||||
|
|
||||||
|
{status, val, 10, participants}
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
rescue
|
||||||
|
_ -> {:none, :none, 10, []}
|
||||||
|
end
|
||||||
|
|
||||||
|
send(cpid, :done)
|
||||||
|
|
||||||
|
receive do
|
||||||
|
:all_done ->
|
||||||
|
Process.sleep(100)
|
||||||
|
|
||||||
|
ql =
|
||||||
|
if name in spare do
|
||||||
|
IO.puts("#{name}: #{inspect(ql = Process.info(pid, :message_queue_len))}")
|
||||||
|
ql
|
||||||
|
else
|
||||||
|
{:message_queue_len, -1}
|
||||||
|
end
|
||||||
|
|
||||||
|
PaxosTest.kill_paxos(pid, name)
|
||||||
|
send(cpid, {:finished, name, pid, status, val, a, ql})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def run_non_leader_should_nack_simple(name, participants, val) do
|
||||||
|
{cpid, pid} = PaxosTest.init(name, participants, true)
|
||||||
|
send(cpid, :ready)
|
||||||
|
|
||||||
|
{status, val, a, spare} =
|
||||||
|
try do
|
||||||
|
receive do
|
||||||
|
:start ->
|
||||||
|
IO.puts("#{inspect(name)}: started")
|
||||||
|
|
||||||
|
participants = Enum.sort(participants)
|
||||||
|
|
||||||
|
[leader | spare ] = participants
|
||||||
|
increase_ballot = Enum.take(spare, floor(length(participants) / 2))
|
||||||
|
[non_leader | _] = Enum.reverse(spare)
|
||||||
|
|
||||||
|
if name == non_leader do
|
||||||
|
# Propose when passed with :kill_before_decision will die right before a decision is selected
|
||||||
|
Process.sleep(2000)
|
||||||
|
res = Paxos.propose(pid, 1, val, 10000)
|
||||||
|
|
||||||
|
if res != {:abort} do
|
||||||
|
IO.puts("#{name}: non-leader failed to abort")
|
||||||
|
{:failed_to_abort, :none, 10, []}
|
||||||
|
else
|
||||||
|
Paxos.propose(pid, 1, val, 10000)
|
||||||
|
|
||||||
|
{status, val} = PaxosTest.wait_for_decision(pid, 1, 10000)
|
||||||
|
|
||||||
|
if status != :none,
|
||||||
|
do: IO.puts("#{name}: decided #{inspect(val)}"),
|
||||||
|
else: IO.puts("#{name}: No decision after 10 seconds")
|
||||||
|
|
||||||
|
{status, val, 10, participants}
|
||||||
|
end
|
||||||
|
else
|
||||||
|
if name in increase_ballot do
|
||||||
|
Process.sleep(10)
|
||||||
|
# Force the non leader process to have a higher ballot
|
||||||
|
Paxos.propose(pid, 1, nil, 1000, :increase_ballot_number)
|
||||||
|
end
|
||||||
|
|
||||||
|
{status, val} = PaxosTest.wait_for_decision(pid, 1, 10000)
|
||||||
|
|
||||||
|
if status != :none,
|
||||||
|
do: IO.puts("#{name}: decided #{inspect(val)}"),
|
||||||
|
else: IO.puts("#{name}: No decision after 10 seconds")
|
||||||
|
|
||||||
|
{status, val, 10, participants}
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
|
rescue
|
||||||
|
_ -> {:none, :none, 10, []}
|
||||||
|
end
|
||||||
|
|
||||||
|
send(cpid, :done)
|
||||||
|
|
||||||
|
receive do
|
||||||
|
:all_done ->
|
||||||
|
Process.sleep(100)
|
||||||
|
|
||||||
|
ql =
|
||||||
|
if name in spare do
|
||||||
|
IO.puts("#{name}: #{inspect(ql = Process.info(pid, :message_queue_len))}")
|
||||||
|
ql
|
||||||
|
else
|
||||||
|
{:message_queue_len, -1}
|
||||||
|
end
|
||||||
|
|
||||||
|
PaxosTest.kill_paxos(pid, name)
|
||||||
|
send(cpid, {:finished, name, pid, status, val, a, ql})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -73,6 +73,19 @@ test_suite = [
|
|||||||
"Leader crashes right before decision, no concurrent ballots, 5 nodes"},
|
"Leader crashes right before decision, no concurrent ballots, 5 nodes"},
|
||||||
{&PaxosTestAditional.run_leader_crash_simple_before_decision/3, TestUtil.get_local_config(5), 10,
|
{&PaxosTestAditional.run_leader_crash_simple_before_decision/3, TestUtil.get_local_config(5), 10,
|
||||||
"Leader crashes right before decision, no concurrent ballots, 5 local procs"},
|
"Leader crashes right before decision, no concurrent ballots, 5 local procs"},
|
||||||
|
|
||||||
|
{&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_dist_config(host, 5), 10,
|
||||||
|
"Leader should nack before decision and then come to decision, no concurrent ballots, 5 nodes"},
|
||||||
|
{&PaxosTestAditional.run_leader_should_nack_simple/3, TestUtil.get_local_config(5), 10,
|
||||||
|
"Leader should nack before decision and then come to decision, 5 local procs"},
|
||||||
|
|
||||||
|
{&PaxosTestAditional.run_non_leader_should_nack_simple/3, TestUtil.get_dist_config(host, 5), 10,
|
||||||
|
"Non-Leader should nack before decision and then come to decision, no concurrent ballots, 5 nodes"},
|
||||||
|
{&PaxosTestAditional.run_non_leader_should_nack_simple/3, TestUtil.get_local_config(5), 10,
|
||||||
|
"Non-Leader should nack before decision and then come to decision, 5 local procs"},
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
Node.stop()
|
Node.stop()
|
||||||
|
@ -61,17 +61,55 @@ end
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### Step 1 - "Requirements" non-trivial aspects
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
receive do
|
||||||
|
{:propose, inst, value, t, pid_to_inform} ->
|
||||||
|
...
|
||||||
|
broadcast({:other_propose, inst, value})
|
||||||
|
...
|
||||||
|
{:other_propose, inst, value} -> ...
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
### Step 2 - "Prepare"
|
### Step 2 - "Prepare"
|
||||||
|
|
||||||
![w:600 center](./second-step.png)
|
![w:600 center](./second-step.png)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### Step 2 - "Prepare" non-trivial aspects
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
receive do
|
||||||
|
{:nack, inst, ballot, new_ballot} ->
|
||||||
|
...
|
||||||
|
broadcast({:abort, inst, ballot})
|
||||||
|
...
|
||||||
|
{:abort, inst, ballot} -> ...
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
### Step 3 - "Accept"
|
### Step 3 - "Accept"
|
||||||
|
|
||||||
![w:600 center](./third-step.png)
|
![w:600 center](./third-step.png)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### Step 3 - "Accept" non-trivial aspects
|
||||||
|
|
||||||
|
```elixir
|
||||||
|
receive do
|
||||||
|
{:accepted, inst, ballot} ->
|
||||||
|
...
|
||||||
|
broadcast({:decide, inst, value})
|
||||||
|
...
|
||||||
|
{:decide, inst, value} -> ...
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
### Step 4 - "Leader Crash"
|
### Step 4 - "Leader Crash"
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user