~mjacar/probablyexactlywrong

ff78324e2ccfa94d99fcacde3231f9f92d229514 — Michael Acar 10 months ago master
Add consensus project
A  => consensus/README.md +3 -0
@@ 1,3 @@
Toy implementation of the Generalized Fast Paxos algorithm from https://arxiv.org/pdf/1902.06776.pdf

See the `client` and `server` directories for more details.

A  => consensus/client/README.md +3 -0
@@ 1,3 @@
Make sure you have servers running on the nodes specified in `mix.exs`. For information on how to do that, see the accompanying README file in the `server` directory.
Run `iex --sname <NAME> -S mix` from this directory to run a node on <NAME>@<HOST>.
Run `Client.input(<value>)` to propose to begin the consensus process with an input value.

A  => consensus/client/lib/client.ex +90 -0
@@ 1,90 @@
defmodule Client do
  def input(value) do
    state_table = StateTable.create_state_table()
    decision_table = DecisionTable.create_decision_table()
    reach_consensus(state_table, decision_table, value, 1)
  end

  # Execute the consensus algorithm
  defp reach_consensus(state_table, decision_table, input_val, register_set) do
    {status, v} = phase1(state_table, decision_table, input_val, register_set)
    cond do
      status == :ok ->
        {status, out} = phase2(state_table, decision_table, v, register_set)
        cond do
          status == :ok ->
            out
          true ->
            reach_consensus(state_table, decision_table, input_val, register_set + 1)
        end
      true ->
        reach_consensus(state_table, decision_table, input_val, register_set + 1)
    end
  end

  # Execute phase 1 of the consensus algorithm
  #
  # Note that this loops through all servers and then looks at the decision
  # table, but this is wrong.
  #
  # What you would actually want to do is process all servers asynchronously
  # and break early if the condition for choosing a value is ever satisfied.
  defp phase1(state_table, decision_table, input_val, register_set) do
    # For each server, clear the register list for all indices below
    # register_set and return the resultant register list
    Application.fetch_env!(:client, :servers)
    |> Enum.map(fn server ->
      {server, GenServer.call({Elixir.RegisterList, server}, {:clear, register_set})}
    end
    )
    # Update our state table based on the returned register lists
    |> Enum.reduce(:nil, fn {server, register_list}, _ ->
      StateTable.write(state_table, server, register_list)
    end
    )
    # Update our decision table based on our updated state table
    DecisionTable.update(decision_table, StateTable.read(state_table))
    # If the decision state of all quorums is None, then choose input_val for phase 2.
    # Otherwise, if at least one quorum has decision state Maybe V, then choose V for phase 2.
    # Else, don't continue to phase 2.
    case DecisionTable.ready_to_write(decision_table, register_set) do
      :nil ->
        {:error, :nil}
      :blank ->
        {:ok, input_val}
      other ->
        {:ok, other}
    end
  end

  # Execute phase 2 of the consensus algorithm
  #
  # Note that this is also wrong in the same way that phase1 is wrong.
  #
  # You don't want to loop through all servers.
  # You want to process all servers asynchronously and break early if you see
  # that a value has been decided.
  defp phase2(state_table, decision_table, v, register_set) do
    # Write the value v to the chosen register_set on all servers and return the
    # resultant register lists
    Application.fetch_env!(:client, :servers)
    |> Enum.map(fn server ->
      {server, GenServer.call({Elixir.RegisterList, server}, {:write, register_set, v})}
    end
    )
    # Update our state table accordingly
    |> Enum.reduce(:nil, fn {server, register_list}, _ ->
      StateTable.write(state_table, server, register_list)
    end
    )
    # Update our decision table with our updated state table
    DecisionTable.update(decision_table, StateTable.read(state_table))
    # Check our decision table to see if a value has been decided
    case DecisionTable.get_decision(decision_table) do
      :blank ->
        {:error, :nil}
      other ->
        {:ok, other}
    end
  end
end

A  => consensus/client/lib/decision-table.ex +204 -0
@@ 1,204 @@
defmodule DecisionTable do
  use Agent

  # Store decision table as a map of maps
  # The inner maps are maps from quorums to decisions
  # We then store these inner maps for each register set
  def create_decision_table do
    {:ok, decision_table} = Agent.start_link(fn ->
      map = enumerate_quorums(0)
      |> Enum.reduce(%{}, fn quorum, inner_map -> Map.put(inner_map, quorum, :any) end)
      %{0 => map}
    end
    )
    decision_table
  end

  def read(decision_table) do
    Agent.get(decision_table, fn val -> val end)
  end

  # Update the decision table from the given state table
  def update(decision_table, state_table) do
    Agent.update(decision_table, fn _ ->
      apply_local_state(state_table)
      |> apply_nonlocal_state(state_table)
    end
    )
    DecisionTable.read(decision_table)
  end

  # Iterate through the decision table and return a decision if one exists
  def get_decision(decision_table) do
    # Get decisions if they exist from all inner maps
    DecisionTable.read(decision_table)
    |> Enum.map(fn {_, inner_map} ->
      inner_map
      |> Enum.reduce(:blank, fn {_, v}, acc ->
        case v do
          {:decided, value} -> value
          _ -> acc
        end
      end
      )
    end
    )
    # Return decision if any of the inner maps contained a decision
    |> Enum.reduce(:blank, fn v, acc ->
      case v do
        :blank -> acc
        val -> val
      end
    end
    )
  end

  # Iterate through decision table and see if the decision for all quorums at
  # register sets 0 to register_set-1 is None or Maybe/Decided V for some single
  # value V.
  #
  # If there is some quorum that has decision Maybe/Decided V and another quorum
  # that has decision Maybe/Decided V' with V' =/= V, then return an indicator
  # that the given register_set is not ready to write.
  def ready_to_write(decision_table, register_set) do
    0..register_set-1
    |> Enum.reduce(:blank, fn r, outer_v ->
      cond do
        outer_v == :nil ->
          :nil
        true ->
          enumerate_quorums(r)
          |> Enum.reduce(outer_v, fn quorum, inner_v ->
            cond do
              inner_v == :nil ->
                :nil
              true ->
                case DecisionTable.read(decision_table)[r][quorum] do
                  :none ->
                    inner_v
                  :any ->
                    inner_v
                  {_, v} ->
                    cond do
                      v == inner_v ->
                        inner_v
                      inner_v == :blank ->
                        v
                      true ->
                        :nil
                    end
                end
            end
          end
          )
      end
    end
    )
  end

  # For each register set R on server S, update the decision table for all
  # quorums where S is in the quorum
  defp apply_local_state(state_table) do
    servers = Application.fetch_env!(:client, :servers)
    n_registers = length(state_table[Enum.at(servers, 0)])

    0..n_registers-1
    |> Enum.reduce(%{}, fn register_set, outer_map ->
      map = enumerate_quorums(register_set)
      |> Enum.reduce(%{}, fn quorum, inner_map ->
        values = quorum
        |> Enum.map(fn server -> Enum.at(state_table[server], register_set) end)
        nonblank_values = Enum.filter(values, fn v -> v != :blank end)
        cond do
          Enum.any?(values, fn val -> val == :nil end) || length(Enum.uniq(nonblank_values)) > 1 ->
            Map.put(inner_map, quorum, :none)
          length(Enum.uniq(values)) == 1 && Enum.at(Enum.uniq(values), 0) != :blank ->
            Map.put(inner_map, quorum, {:decided, Enum.at(values, 0)})
          length(Enum.uniq(nonblank_values)) == 1 ->
            Map.put(inner_map, quorum, {:maybe, Enum.at(nonblank_values, 0)})
          true ->
            Map.put(inner_map, quorum, :any)
        end
      end
      )
      Map.put(outer_map, register_set, map)
    end
    )
  end

  # For each register set R on server S, update the decision table for all
  # quorums over register sets 0 to R-1
  defp apply_nonlocal_state(decision_table, state_table) do
    servers = Application.fetch_env!(:client, :servers)
    n_registers = length(state_table[Enum.at(servers, 0)])

    # Get flat list of all non-nil, non-blank elements in state_table
    elements = 0..n_registers-1
    |> Enum.reduce([], fn register_set, outer_list ->
      list = servers
      |> Enum.reduce([], fn server, inner_list ->
        val = Enum.at(state_table[server], register_set)
        cond do
          val != :blank && val != :nil ->
            inner_list ++ [{register_set, val}]
          true ->
            inner_list
        end
      end
      )
      outer_list ++ list
    end
    )
    |> Enum.uniq()

    # Propagate the value for each non-nil, non-blank element in state_table
    # to all lower register sets
    elements
    |> Enum.reduce(decision_table, fn elem, table ->
      {register_set, val} = elem
      propagate_changes(table, register_set, val)
    end
    )
  end

  # Given a register set and a value, propagate the value to all lower register
  # sets
  defp propagate_changes(decision_table, register_set, val) do
    0..register_set-1
    |> Enum.reduce(decision_table, fn r, table ->
      map = enumerate_quorums(r)
      |> Enum.reduce(table[r], fn quorum, inner_map ->
        decision = table[r][quorum]
        case decision do
          :any ->
            Map.put(inner_map, quorum, {:maybe, val})
          {:maybe, v} ->
            cond do
              v != val ->
                Map.put(inner_map, quorum, :none)
              true ->
                Map.put(inner_map, quorum, {:maybe, val})
            end
          other ->
            Map.put(inner_map, quorum, other)
        end
      end
      )
      Map.put(table, r, map)
    end
    )
  end

  # Enumerate all the quorums for the given register set
  #
  # Note that the quorums for all register sets are the same in this
  # particular implementation, but it is simple to generalize by just editing
  # this function
  defp enumerate_quorums(register_set) do
    servers = Application.fetch_env!(:client, :servers)
    s0 = Enum.at(servers, 0)
    s1 = Enum.at(servers, 1)
    s2 = Enum.at(servers, 2)
    [[s0, s1], [s0, s2], [s1, s2]]
  end
end

A  => consensus/client/lib/state-table.ex +48 -0
@@ 1,48 @@
defmodule StateTable do
  require Agent

  # Store state table as map from servers to register lists
  def create_state_table do
    {:ok, state_table} = Agent.start_link(fn ->
      Application.fetch_env!(:client, :servers)
      |> Enum.reduce(%{}, fn server, map -> Map.put(map, server, [:blank]) end)
    end
    )
    state_table
  end

  def read(state_table) do
    Agent.get(state_table, fn val -> val end)
  end

  # Given a register list from a server, update the state table
  def write(state_table, server, register_vals) do
    Agent.update(state_table, fn st ->
      st
      |> Enum.map(fn {k, v} ->
        cond do
          k == server && length(v) <= length(register_vals) ->
            {k, register_vals}
          k == server ->
            {k, register_vals ++ Enum.take(v, length(register_vals) - length(v))}
          true ->
            {k, extend_list(v, length(register_vals))}
        end
      end
      )
      |> Enum.into(%{})
    end
    )
    StateTable.read(state_table)
  end

  # Extend list to length len, right-padding with :blank
  defp extend_list(list, len) do
    cond do
      length(list) < len ->
        extend_list(list ++ [:blank], len)
      true ->
        list
    end
  end
end

A  => consensus/client/mix.exs +29 -0
@@ 1,29 @@
defmodule Client.MixProject do
  use Mix.Project

  def project do
    [
      app: :client,
      version: "0.1.0",
      elixir: "~> 1.8",
      start_permanent: Mix.env() == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      env: [servers: [:"s1@localhost", :"s2@localhost", :"s3@localhost"]],
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
    ]
  end
end

A  => consensus/server/README.md +2 -0
@@ 1,2 @@
Run `iex --sname <NAME> -S mix` from this directory to run a node on <NAME>@<HOST>.
Run `Server.start()` to run a server.

A  => consensus/server/lib/register-list.ex +112 -0
@@ 1,112 @@
defmodule RegisterList do
  use GenServer

  # Client
  def start_link(_) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def clear(pid, idx) do
    GenServer.call(pid, {:clear, idx})
  end

  def write(pid, idx, val) do
    GenServer.call(pid, {:write, idx, val})
  end

  # Server (callbacks)
  @impl true
  def init(_) do
    registers = read_from_disk()
    {:ok, registers}
  end

  # Write :nil to all registers from 0 to idx-1
  # Save the register list to disk
  # Return the list of all registers
  @impl true
  def handle_call({:clear, idx}, _from, registers) do
    registers = extend_list(registers, idx)
    0..idx-1
    |> Enum.reduce(:nil, fn r, _ ->
      Register.write(Enum.at(registers, r), :nil)
      :nil
    end
    )
    save_to_disk!(registers)
    {:reply, Enum.map(registers, fn x -> Register.read(x) end), registers}
  end

  # Write :nil to all registers from 0 to idx-1
  # Write val to the register at index idx
  # Save the register list to disk
  # Return the list of all registers
  @impl true
  def handle_call({:write, idx, val}, _from, registers) do
    registers = extend_list(registers, idx + 1)
    0..idx-1
    |> Enum.reduce(:nil, fn r, _ ->
      Register.write(Enum.at(registers, r), :nil)
      :nil
    end
    )
    Register.write(Enum.at(registers, idx), val)
    save_to_disk!(registers)
    {:reply, Enum.map(registers, fn x -> Register.read(x) end), registers}
  end

  # Read a register list from disk
  defp read_from_disk() do
    filename = to_string(Node.self())
    {status, text} = File.read(filename)
    cond do
      status == :error ->
        [Register.create_register()]
      true ->
        parse_text(text)
    end
  end

  defp parse_text(text) do
    vals = String.split(text, "\n")
    Enum.map(vals, fn val ->
      register = Register.create_register()
      cond do
        val == "" ->
          Register.write(register, :nil)
        val == "blank" ->
          Register.write(register, :blank)
        true ->
          Register.write(register, val)
      end
    end
      )
  end

  # Write a register list to disk
  defp save_to_disk!(registers) do
    filename = to_string(Node.self())
    File.rm(filename)
    {:ok, file} = File.open(filename, [:write])
    contents = Enum.join(
      Enum.map(registers, fn register ->
        val = to_string(Register.read(register))
        val <> "\n"
      end
      ),
      ""
    )
    IO.binwrite(file, contents)
    File.close(file)
  end

  # Extend list to length len, right-padding with new registers
  defp extend_list(list, len) do
    cond do
      length(list) < len ->
        extend_list(list ++ [Register.create_register()], len)
      true ->
        list
    end
  end
end

A  => consensus/server/lib/register.ex +19 -0
@@ 1,19 @@
defmodule Register do
  use Agent

  def create_register() do
    {:ok, register} = Agent.start_link(fn -> :blank end)
    register
  end

  def read(register) do
    Agent.get(register, fn val -> val end)
  end

  def write(register, new_val) do
    with :blank <- Register.read(register) do
      Agent.update(register, fn _ -> new_val end)
    end
    Register.read(register)
  end
end

A  => consensus/server/lib/server.ex +6 -0
@@ 1,6 @@
defmodule Server do
  def start do
    children = [RegisterList]
    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

A  => consensus/server/mix.exs +28 -0
@@ 1,28 @@
defmodule Server.MixProject do
  use Mix.Project

  def project do
    [
      app: :server,
      version: "0.1.0",
      elixir: "~> 1.8",
      start_permanent: Mix.env() == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger]
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
    ]
  end
end