ff78324e2ccfa94d99fcacde3231f9f92d229514 — Michael Acar 4 months ago master
Add consensus project
A  => consensus/README.md +3 -0
@@ 1,3 @@
+ Toy implementation of the Generalized Fast Paxos algorithm from https://arxiv.org/pdf/1902.06776.pdf
+ 
+ See the `client` and `server` directories for more details.

A  => consensus/client/README.md +3 -0
@@ 1,3 @@
+ Make sure you have servers running on the nodes specified in `mix.exs`. For information on how to do that, see the accompanying README file in the `server` directory.
+ Run `iex --sname <NAME> -S mix` from this directory to run a node on <NAME>@<HOST>.
+ Run `Client.input(<value>)` to propose to begin the consensus process with an input value.

A  => consensus/client/lib/client.ex +90 -0
@@ 1,90 @@
+ defmodule Client do
+   def input(value) do
+     state_table = StateTable.create_state_table()
+     decision_table = DecisionTable.create_decision_table()
+     reach_consensus(state_table, decision_table, value, 1)
+   end
+ 
+   # Execute the consensus algorithm
+   defp reach_consensus(state_table, decision_table, input_val, register_set) do
+     {status, v} = phase1(state_table, decision_table, input_val, register_set)
+     cond do
+       status == :ok ->
+         {status, out} = phase2(state_table, decision_table, v, register_set)
+         cond do
+           status == :ok ->
+             out
+           true ->
+             reach_consensus(state_table, decision_table, input_val, register_set + 1)
+         end
+       true ->
+         reach_consensus(state_table, decision_table, input_val, register_set + 1)
+     end
+   end
+ 
+   # Execute phase 1 of the consensus algorithm
+   #
+   # Note that this loops through all servers and then looks at the decision
+   # table, but this is wrong.
+   #
+   # What you would actually want to do is process all servers asynchronously
+   # and break early if the condition for choosing a value is ever satisfied.
+   defp phase1(state_table, decision_table, input_val, register_set) do
+     # For each server, clear the register list for all indices below
+     # register_set and return the resultant register list
+     Application.fetch_env!(:client, :servers)
+     |> Enum.map(fn server ->
+       {server, GenServer.call({Elixir.RegisterList, server}, {:clear, register_set})}
+     end
+     )
+     # Update our state table based on the returned register lists
+     |> Enum.reduce(:nil, fn {server, register_list}, _ ->
+       StateTable.write(state_table, server, register_list)
+     end
+     )
+     # Update our decision table based on our updated state table
+     DecisionTable.update(decision_table, StateTable.read(state_table))
+     # If the decision state of all quorums is None, then choose input_val for phase 2.
+     # Otherwise, if at least one quorum has decision state Maybe V, then choose V for phase 2.
+     # Else, don't continue to phase 2.
+     case DecisionTable.ready_to_write(decision_table, register_set) do
+       :nil ->
+         {:error, :nil}
+       :blank ->
+         {:ok, input_val}
+       other ->
+         {:ok, other}
+     end
+   end
+ 
+   # Execute phase 2 of the consensus algorithm
+   #
+   # Note that this is also wrong in the same way that phase1 is wrong.
+   #
+   # You don't want to loop through all servers.
+   # You want to process all servers asynchronously and break early if you see
+   # that a value has been decided.
+   defp phase2(state_table, decision_table, v, register_set) do
+     # Write the value v to the chosen register_set on all servers and return the
+     # resultant register lists
+     Application.fetch_env!(:client, :servers)
+     |> Enum.map(fn server ->
+       {server, GenServer.call({Elixir.RegisterList, server}, {:write, register_set, v})}
+     end
+     )
+     # Update our state table accordingly
+     |> Enum.reduce(:nil, fn {server, register_list}, _ ->
+       StateTable.write(state_table, server, register_list)
+     end
+     )
+     # Update our decision table with our updated state table
+     DecisionTable.update(decision_table, StateTable.read(state_table))
+     # Check our decision table to see if a value has been decided
+     case DecisionTable.get_decision(decision_table) do
+       :blank ->
+         {:error, :nil}
+       other ->
+         {:ok, other}
+     end
+   end
+ end

A  => consensus/client/lib/decision-table.ex +204 -0
@@ 1,204 @@
+ defmodule DecisionTable do
+   use Agent
+ 
+   # Store decision table as a map of maps
+   # The inner maps are maps from quorums to decisions
+   # We then store these inner maps for each register set
+   def create_decision_table do
+     {:ok, decision_table} = Agent.start_link(fn ->
+       map = enumerate_quorums(0)
+       |> Enum.reduce(%{}, fn quorum, inner_map -> Map.put(inner_map, quorum, :any) end)
+       %{0 => map}
+     end
+     )
+     decision_table
+   end
+ 
+   def read(decision_table) do
+     Agent.get(decision_table, fn val -> val end)
+   end
+ 
+   # Update the decision table from the given state table
+   def update(decision_table, state_table) do
+     Agent.update(decision_table, fn _ ->
+       apply_local_state(state_table)
+       |> apply_nonlocal_state(state_table)
+     end
+     )
+     DecisionTable.read(decision_table)
+   end
+ 
+   # Iterate through the decision table and return a decision if one exists
+   def get_decision(decision_table) do
+     # Get decisions if they exist from all inner maps
+     DecisionTable.read(decision_table)
+     |> Enum.map(fn {_, inner_map} ->
+       inner_map
+       |> Enum.reduce(:blank, fn {_, v}, acc ->
+         case v do
+           {:decided, value} -> value
+           _ -> acc
+         end
+       end
+       )
+     end
+     )
+     # Return decision if any of the inner maps contained a decision
+     |> Enum.reduce(:blank, fn v, acc ->
+       case v do
+         :blank -> acc
+         val -> val
+       end
+     end
+     )
+   end
+ 
+   # Iterate through decision table and see if the decision for all quorums at
+   # register sets 0 to register_set-1 is None or Maybe/Decided V for some single
+   # value V.
+   #
+   # If there is some quorum that has decision Maybe/Decided V and another quorum
+   # that has decision Maybe/Decided V' with V' =/= V, then return an indicator
+   # that the given register_set is not ready to write.
+   def ready_to_write(decision_table, register_set) do
+     0..register_set-1
+     |> Enum.reduce(:blank, fn r, outer_v ->
+       cond do
+         outer_v == :nil ->
+           :nil
+         true ->
+           enumerate_quorums(r)
+           |> Enum.reduce(outer_v, fn quorum, inner_v ->
+             cond do
+               inner_v == :nil ->
+                 :nil
+               true ->
+                 case DecisionTable.read(decision_table)[r][quorum] do
+                   :none ->
+                     inner_v
+                   :any ->
+                     inner_v
+                   {_, v} ->
+                     cond do
+                       v == inner_v ->
+                         inner_v
+                       inner_v == :blank ->
+                         v
+                       true ->
+                         :nil
+                     end
+                 end
+             end
+           end
+           )
+       end
+     end
+     )
+   end
+ 
+   # For each register set R on server S, update the decision table for all
+   # quorums where S is in the quorum
+   defp apply_local_state(state_table) do
+     servers = Application.fetch_env!(:client, :servers)
+     n_registers = length(state_table[Enum.at(servers, 0)])
+ 
+     0..n_registers-1
+     |> Enum.reduce(%{}, fn register_set, outer_map ->
+       map = enumerate_quorums(register_set)
+       |> Enum.reduce(%{}, fn quorum, inner_map ->
+         values = quorum
+         |> Enum.map(fn server -> Enum.at(state_table[server], register_set) end)
+         nonblank_values = Enum.filter(values, fn v -> v != :blank end)
+         cond do
+           Enum.any?(values, fn val -> val == :nil end) || length(Enum.uniq(nonblank_values)) > 1 ->
+             Map.put(inner_map, quorum, :none)
+           length(Enum.uniq(values)) == 1 && Enum.at(Enum.uniq(values), 0) != :blank ->
+             Map.put(inner_map, quorum, {:decided, Enum.at(values, 0)})
+           length(Enum.uniq(nonblank_values)) == 1 ->
+             Map.put(inner_map, quorum, {:maybe, Enum.at(nonblank_values, 0)})
+           true ->
+             Map.put(inner_map, quorum, :any)
+         end
+       end
+       )
+       Map.put(outer_map, register_set, map)
+     end
+     )
+   end
+ 
+   # For each register set R on server S, update the decision table for all
+   # quorums over register sets 0 to R-1
+   defp apply_nonlocal_state(decision_table, state_table) do
+     servers = Application.fetch_env!(:client, :servers)
+     n_registers = length(state_table[Enum.at(servers, 0)])
+ 
+     # Get flat list of all non-nil, non-blank elements in state_table
+     elements = 0..n_registers-1
+     |> Enum.reduce([], fn register_set, outer_list ->
+       list = servers
+       |> Enum.reduce([], fn server, inner_list ->
+         val = Enum.at(state_table[server], register_set)
+         cond do
+           val != :blank && val != :nil ->
+             inner_list ++ [{register_set, val}]
+           true ->
+             inner_list
+         end
+       end
+       )
+       outer_list ++ list
+     end
+     )
+     |> Enum.uniq()
+ 
+     # Propagate the value for each non-nil, non-blank element in state_table
+     # to all lower register sets
+     elements
+     |> Enum.reduce(decision_table, fn elem, table ->
+       {register_set, val} = elem
+       propagate_changes(table, register_set, val)
+     end
+     )
+   end
+ 
+   # Given a register set and a value, propagate the value to all lower register
+   # sets
+   defp propagate_changes(decision_table, register_set, val) do
+     0..register_set-1
+     |> Enum.reduce(decision_table, fn r, table ->
+       map = enumerate_quorums(r)
+       |> Enum.reduce(table[r], fn quorum, inner_map ->
+         decision = table[r][quorum]
+         case decision do
+           :any ->
+             Map.put(inner_map, quorum, {:maybe, val})
+           {:maybe, v} ->
+             cond do
+               v != val ->
+                 Map.put(inner_map, quorum, :none)
+               true ->
+                 Map.put(inner_map, quorum, {:maybe, val})
+             end
+           other ->
+             Map.put(inner_map, quorum, other)
+         end
+       end
+       )
+       Map.put(table, r, map)
+     end
+     )
+   end
+ 
+   # Enumerate all the quorums for the given register set
+   #
+   # Note that the quorums for all register sets are the same in this
+   # particular implementation, but it is simple to generalize by just editing
+   # this function
+   defp enumerate_quorums(register_set) do
+     servers = Application.fetch_env!(:client, :servers)
+     s0 = Enum.at(servers, 0)
+     s1 = Enum.at(servers, 1)
+     s2 = Enum.at(servers, 2)
+     [[s0, s1], [s0, s2], [s1, s2]]
+   end
+ end

A  => consensus/client/lib/state-table.ex +48 -0
@@ 1,48 @@
+ defmodule StateTable do
+   require Agent
+ 
+   # Store state table as map from servers to register lists
+   def create_state_table do
+     {:ok, state_table} = Agent.start_link(fn ->
+       Application.fetch_env!(:client, :servers)
+       |> Enum.reduce(%{}, fn server, map -> Map.put(map, server, [:blank]) end)
+     end
+     )
+     state_table
+   end
+ 
+   def read(state_table) do
+     Agent.get(state_table, fn val -> val end)
+   end
+ 
+   # Given a register list from a server, update the state table
+   def write(state_table, server, register_vals) do
+     Agent.update(state_table, fn st ->
+       st
+       |> Enum.map(fn {k, v} ->
+         cond do
+           k == server && length(v) <= length(register_vals) ->
+             {k, register_vals}
+           k == server ->
+             {k, register_vals ++ Enum.take(v, length(register_vals) - length(v))}
+           true ->
+             {k, extend_list(v, length(register_vals))}
+         end
+       end
+       )
+       |> Enum.into(%{})
+     end
+     )
+     StateTable.read(state_table)
+   end
+ 
+   # Extend list to length len, right-padding with :blank
+   defp extend_list(list, len) do
+     cond do
+       length(list) < len ->
+         extend_list(list ++ [:blank], len)
+       true ->
+         list
+     end
+   end
+ end

A  => consensus/client/mix.exs +29 -0
@@ 1,29 @@
+ defmodule Client.MixProject do
+   use Mix.Project
+ 
+   def project do
+     [
+       app: :client,
+       version: "0.1.0",
+       elixir: "~> 1.8",
+       start_permanent: Mix.env() == :prod,
+       deps: deps()
+     ]
+   end
+ 
+   # Run "mix help compile.app" to learn about applications.
+   def application do
+     [
+       extra_applications: [:logger],
+       env: [servers: [:"s1@localhost", :"s2@localhost", :"s3@localhost"]],
+     ]
+   end
+ 
+   # Run "mix help deps" to learn about dependencies.
+   defp deps do
+     [
+       # {:dep_from_hexpm, "~> 0.3.0"},
+       # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
+     ]
+   end
+ end

A  => consensus/server/README.md +2 -0
@@ 1,2 @@
+ Run `iex --sname <NAME> -S mix` from this directory to run a node on <NAME>@<HOST>.
+ Run `Server.start()` to run a server.

A  => consensus/server/lib/register-list.ex +112 -0
@@ 1,112 @@
+ defmodule RegisterList do
+   use GenServer
+ 
+   # Client
+   def start_link(_) do
+     GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
+   end
+ 
+   def clear(pid, idx) do
+     GenServer.call(pid, {:clear, idx})
+   end
+ 
+   def write(pid, idx, val) do
+     GenServer.call(pid, {:write, idx, val})
+   end
+ 
+   # Server (callbacks)
+   @impl true
+   def init(_) do
+     registers = read_from_disk()
+     {:ok, registers}
+   end
+ 
+   # Write :nil to all registers from 0 to idx-1
+   # Save the register list to disk
+   # Return the list of all registers
+   @impl true
+   def handle_call({:clear, idx}, _from, registers) do
+     registers = extend_list(registers, idx)
+     0..idx-1
+     |> Enum.reduce(:nil, fn r, _ ->
+       Register.write(Enum.at(registers, r), :nil)
+       :nil
+     end
+     )
+     save_to_disk!(registers)
+     {:reply, Enum.map(registers, fn x -> Register.read(x) end), registers}
+   end
+ 
+   # Write :nil to all registers from 0 to idx-1
+   # Write val to the register at index idx
+   # Save the register list to disk
+   # Return the list of all registers
+   @impl true
+   def handle_call({:write, idx, val}, _from, registers) do
+     registers = extend_list(registers, idx + 1)
+     0..idx-1
+     |> Enum.reduce(:nil, fn r, _ ->
+       Register.write(Enum.at(registers, r), :nil)
+       :nil
+     end
+     )
+     Register.write(Enum.at(registers, idx), val)
+     save_to_disk!(registers)
+     {:reply, Enum.map(registers, fn x -> Register.read(x) end), registers}
+   end
+ 
+   # Read a register list from disk
+   defp read_from_disk() do
+     filename = to_string(Node.self())
+     {status, text} = File.read(filename)
+     cond do
+       status == :error ->
+         [Register.create_register()]
+       true ->
+         parse_text(text)
+     end
+   end
+ 
+   defp parse_text(text) do
+     vals = String.split(text, "\n")
+     Enum.map(vals, fn val ->
+       register = Register.create_register()
+       cond do
+         val == "" ->
+           Register.write(register, :nil)
+         val == "blank" ->
+           Register.write(register, :blank)
+         true ->
+           Register.write(register, val)
+       end
+     end
+       )
+   end
+ 
+   # Write a register list to disk
+   defp save_to_disk!(registers) do
+     filename = to_string(Node.self())
+     File.rm(filename)
+     {:ok, file} = File.open(filename, [:write])
+     contents = Enum.join(
+       Enum.map(registers, fn register ->
+         val = to_string(Register.read(register))
+         val <> "\n"
+       end
+       ),
+       ""
+     )
+     IO.binwrite(file, contents)
+     File.close(file)
+   end
+ 
+   # Extend list to length len, right-padding with new registers
+   defp extend_list(list, len) do
+     cond do
+       length(list) < len ->
+         extend_list(list ++ [Register.create_register()], len)
+       true ->
+         list
+     end
+   end
+ end

A  => consensus/server/lib/register.ex +19 -0
@@ 1,19 @@
+ defmodule Register do
+   use Agent
+ 
+   def create_register() do
+     {:ok, register} = Agent.start_link(fn -> :blank end)
+     register
+   end
+ 
+   def read(register) do
+     Agent.get(register, fn val -> val end)
+   end
+ 
+   def write(register, new_val) do
+     with :blank <- Register.read(register) do
+       Agent.update(register, fn _ -> new_val end)
+     end
+     Register.read(register)
+   end
+ end

A  => consensus/server/lib/server.ex +6 -0
@@ 1,6 @@
+ defmodule Server do
+   def start do
+     children = [RegisterList]
+     Supervisor.start_link(children, strategy: :one_for_one)
+   end
+ end

A  => consensus/server/mix.exs +28 -0
@@ 1,28 @@
+ defmodule Server.MixProject do
+   use Mix.Project
+ 
+   def project do
+     [
+       app: :server,
+       version: "0.1.0",
+       elixir: "~> 1.8",
+       start_permanent: Mix.env() == :prod,
+       deps: deps()
+     ]
+   end
+ 
+   # Run "mix help compile.app" to learn about applications.
+   def application do
+     [
+       extra_applications: [:logger]
+     ]
+   end
+ 
+   # Run "mix help deps" to learn about dependencies.
+   defp deps do
+     [
+       # {:dep_from_hexpm, "~> 0.3.0"},
+       # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
+     ]
+   end
+ end