From 790ecf54464cf1d38cbcf26cca9836b53e6556fa Mon Sep 17 00:00:00 2001 From: Andre Henriques Date: Thu, 16 Nov 2023 11:57:42 +0000 Subject: [PATCH] Initial commit --- .formatter.exs | 4 ++ .gitignore | 26 ++++++++++ eager_reliable_broadcast.ex | 95 +++++++++++++++++++++++++++++++++++++ mix.exs | 28 +++++++++++ 4 files changed, 153 insertions(+) create mode 100644 .formatter.exs create mode 100644 .gitignore create mode 100644 eager_reliable_broadcast.ex create mode 100644 mix.exs diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..67efd20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +distributed_system_coursework-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/eager_reliable_broadcast.ex b/eager_reliable_broadcast.ex new file mode 100644 index 0000000..75c69e4 --- /dev/null +++ b/eager_reliable_broadcast.ex @@ -0,0 +1,95 @@ +defmodule EagerReliableBroadcast do + def start(name, processes) do + pid = spawn(EagerReliableBroadcast, :init, [name, processes]) + # :global.unregister_name(name) + case :global.re_register_name(name, pid) do + :yes -> pid + :no -> :error + end + IO.puts "registered #{name}" + pid + end + + # Init event must be the first + # one after the component is created + def init(name, processes) do + state = %{ + name: name, + processes: processes, + delivered: %{}, # Use this data structure to remember IDs of the delivered messages + seq_no: 0 # Use this variable to remember the last sequence number used to identify a message + } + run(state) + end + + def run(state) do + state = receive do + # Handle the broadcast request event + {:broadcast, m} -> + IO.puts("#{inspect state.name}: RB-broadcast: #{inspect m}") + # Create a unique message identifier from state.name and state.seqno. + # Create a new data message data_msg from the given payload m + # the message identifier. + # Update the state as necessary + data_msg = {:data, state.name, state.seq_no, m} + + # Use the provided beb_broadcast function to propagate data_msg to + + # all process + beb_broadcast(data_msg, state.processes) + %{state | seq_no: state.seq_no + 1 } + + {:data, proc, seq_no, m} -> + if not Map.has_key?(state.delivered, {proc, seq_no, m}) do + data_msg = {:data, proc, seq_no, m} + beb_broadcast(data_msg, state.processes) + unicast({:deliver, proc, m}, state.name) + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, 1)} + else + val = Map.get(state.delivered, {proc, seq_no, m}) + if val < Enum.count(state.processes) do + %{state | delivered: Map.put(state.delivered, {proc, seq_no, m}, val + 1)} + else + %{state | delivered: Map.delete(state.delivered, {proc, seq_no, m})} + end + end + + # If was already delivered, do nothing. + # Otherwise, update delivered, generate a deliver event for the + # upper layer, and re-broadcast (echo) the received message. + # In both cases, do not forget to return the state. + + {:deliver, proc, m} -> + # Simulate the deliver indication event + IO.puts("#{inspect state.name}: RB-deliver: #{inspect m} from #{inspect proc}") + state + end + run(state) + end + + + defp unicast(m, p) do + case :global.whereis_name(p) do + pid when is_pid(pid) -> send(pid, m) + :undefined -> :ok + end + end + + defp beb_broadcast(m, dest), do: for p <- dest, do: unicast(m, p) + + # You can use this function to simulate a process failure. + # name: the name of this process + # proc_to_fail: the name of the failed process + # fail_send_to: list of processes proc_to_fail will not be broadcasting messages to + # Note that this list must include proc_to_fail. + # m and dest are the same as the respective arguments of the normal + # beb_broadcast. + defp beb_broadcast_with_failures(name, proc_to_fail, fail_send_to, m, dest) do + if name == proc_to_fail do + for p <- dest, p not in fail_send_to, do: unicast(m, p) + else + for p <- dest, p != proc_to_fail, do: unicast(m, p) + end + end + +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..e1db2db --- /dev/null +++ b/mix.exs @@ -0,0 +1,28 @@ +defmodule DistributedSystemCoursework.MixProject do + use Mix.Project + + def project do + [ + app: :distributed_system_coursework, + version: "0.1.0", + elixir: "~> 1.15", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger] + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + # {:dep_from_hexpm, "~> 0.3.0"}, + # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} + ] + end +end