~ihabunek/triglav

e82c3ee94a5f16cc66e54dddc73b1f2cb886f342 — Ivan Habunek 9 months ago 78805a2
Implement Osmosis data import
M lib/triglav.ex => lib/triglav.ex +12 -0
@@ 6,4 6,16 @@ defmodule Triglav do
  Contexts are also responsible for managing your data, regardless
  if it comes from the database, an external API or others.
  """

  def tmp_dir() do
    with dir when is_binary(dir) <- System.tmp_dir(),
         path = Path.join([dir, "triglav_tmp"]),
         :ok <- File.mkdir_p(path) do
      {:ok, path}
    else
      _ -> :error
    end
  end
end

3

A lib/triglav/http.ex => lib/triglav/http.ex +41 -0
@@ 0,0 1,41 @@
defmodule Triglav.Http do
  require Logger

  def get(url) do
    with {:ok, {{'HTTP/1.1', 200, 'OK'}, _headers, body}} <-
           :httpc.request(:get, {to_charlist(url), []}, [], []) do
      {:ok, to_string(body)}
    end
  end

  @spec download(String.t(), String.t(), Keyword.t()) :: {:ok, String.t()} | {:error, term()}
  def download(url, target_dir, opts \\ []) do
    default_filename =
      url
      |> URI.parse()
      |> Map.fetch!(:path)
      |> Path.basename()

    overwrite? = Keyword.get(opts, :overwrite, false)
    filename = Keyword.get(opts, :filename, default_filename)
    path = Path.join(target_dir, filename)
    exists? = File.exists?(path)

    with :ok <- maybe_delete(path, exists?, overwrite?),
         :ok <- do_download(url, path) do
      {:ok, path}
    end
  end

  defp maybe_delete(_, false, _), do: :ok
  defp maybe_delete(_, true, false), do: {:error, :file_exists}
  defp maybe_delete(path, true, true), do: File.rm(path)

  defp do_download(url, target) do
    Logger.info("Downloading #{url} to #{target}")

    case :httpc.request(:get, {to_charlist(url), []}, [], stream: to_charlist(target)) do
      {:ok, :saved_to_file} -> :ok
    end
  end
end

A lib/triglav/import/geofabrik.ex => lib/triglav/import/geofabrik.ex +113 -0
@@ 0,0 1,113 @@
defmodule Triglav.Import.Geofabrik do
  @moduledoc """
  Download data snapshots from
  https://download.geofabrik.de/europe/croatia.html
  """

  alias Triglav.Http
  alias Triglav.Repo
  alias Triglav.Schemas.OsmState

  require Logger

  @pbf_url "https://download.geofabrik.de/europe/croatia-latest.osm.pbf"
  @hash_url "https://download.geofabrik.de/europe/croatia-latest.osm.pbf.md5"
  @state_url "https://download.geofabrik.de/europe/croatia-updates/state.txt"
  @updates_url "https://download.geofabrik.de/europe/croatia-updates/"

  @spec web_state() :: {:ok, map()} | {:error, term}
  def web_state() do
    with {:ok, body} <- Http.get(@state_url) do
      parse_web_state(body)
    end
  end

  @spec local_state() :: OsmState.t() | nil
  def local_state() do
    case Repo.get(OsmState, 1) do
      nil -> {:error, :not_found}
      state -> {:ok, state}
    end
  end

  @spec save_state(map()) :: {:ok, OsmState.t()} | {:error, Changeset.t()}
  def save_state(attrs) do
    %OsmState{id: 1}
    |> OsmState.changeset(attrs)
    |> Repo.insert(on_conflict: :replace_all, conflict_target: [:id])
  end

  @spec download_latest(String.t()) :: {:ok, String.t()} | {:error, term()}
  def download_latest(target_dir) do
    with {:ok, pbf_path} <- Http.download(@pbf_url, target_dir, overwrite: true),
         {:ok, hash_path} <- Http.download(@hash_url, target_dir, overwrite: true),
         :ok <- validate_hash(target_dir, hash_path) do
      {:ok, pbf_path}
    end
  end

  @spec download_change(integer(), String.t()) :: {:ok, String.t()} | {:error, term()}
  def download_change(sequence_number, target_dir) do
    sequence_number
    |> change_url()
    |> Http.download(target_dir, overwrite: true)
  end

  @spec get_change_state(integer()) :: {:ok, OsmState.t()} | {:error, term()}
  def get_change_state(sequence_number) do
    state_url = change_state_url(sequence_number)

    with {:ok, body} <- Http.get(state_url) do
      parse_web_state(body)
    end
  end

  defp parse_web_state(text_state) do
    state =
      text_state
      |> String.split(~r"\n")
      |> Enum.reject(&String.starts_with?(&1, "#"))
      |> Enum.reject(&(&1 == ""))
      |> Enum.map(&String.split(&1, "="))
      |> Enum.map(&List.to_tuple/1)
      |> Enum.into(%{}, fn {k, v} -> {k, v} end)

    with {sequence_number, ""} <- Integer.parse(state["sequenceNumber"]),
         str_timestamp = String.replace(state["timestamp"], ~r"\\", ""),
         {:ok, timestamp, 0} <- DateTime.from_iso8601(str_timestamp) do
      {:ok, %{sequence_number: sequence_number, timestamp: timestamp}}
    end
  end

  defp change_url(sequence_number) do
    change_base_url(sequence_number) <> ".osc.gz"
  end

  defp change_state_url(sequence_number) do
    change_base_url(sequence_number) <> ".state.txt"
  end

  defp change_base_url(sequence_number) do
    path =
      sequence_number
      |> to_string()
      |> String.pad_leading(9, "0")
      |> String.to_charlist()
      |> Enum.chunk_every(3)
      |> Enum.map(&to_string/1)
      |> Enum.join("/")

    @updates_url <> path
  end

  defp validate_hash(target_dir, hash_path) do
    case System.cmd("md5sum", ["--check", hash_path], cd: target_dir) do
      {_, 0} ->
        Logger.info("Checksum OK")
        :ok

      _ ->
        {:error, :invalid_hash}
    end
  end
end

D lib/triglav/import/osm_osmium.ex => lib/triglav/import/osm_osmium.ex +0 -0
A lib/triglav/import/osmosis.ex => lib/triglav/import/osmosis.ex +145 -0
@@ 0,0 1,145 @@
defmodule Triglav.Import.Osmosis do
  @moduledoc """
  Imports the latest OSM data for Croatia from Geofabrik

  See:
  https://download.geofabrik.de/europe/croatia.html
  """

  alias Triglav.Import.Geofabrik
  alias Triglav.Repo

  require Logger

  @schema "osmosis"

  # Osmosis schema creation scripts
  # https://wiki.openstreetmap.org/wiki/Osmosis/PostGIS_Setup
  @scripts [
    "priv/osmosis/pgsnapshot_schema_0.6.sql",
    # "priv/osmosis/pgsnapshot_schema_0.6_action.sql",
    # "priv/osmosis/pgsnapshot_schema_0.6_changes.sql",
    "priv/osmosis/pgsnapshot_schema_0.6_linestring.sql"
  ]

  def create_schema() do
    file_args =
      @scripts
      |> Enum.map(&Application.app_dir(:triglav, &1))
      |> Enum.map(&["--file", &1])
      |> List.flatten()

    psql_args = [
      "--quiet",
      "--single-transaction",
      "--command",
      "DROP SCHEMA IF EXISTS \"#{@schema}\" CASCADE;",
      "--command",
      "CREATE SCHEMA \"#{@schema}\";",
      "--command",
      "SET search_path TO \"#{@schema}\",public;"
      | file_args
    ]

    cmd("psql", psql_args)
  end

  def schema_exists?() do
    result =
      Repo.query!(
        "SELECT * FROM information_schema.schemata WHERE schema_name = $1;",
        [@schema]
      )

    result.num_rows > 0
  end

  def load_initial() do
    with {:ok, state} <- Geofabrik.web_state(),
         {:ok, tmp_dir} <- Triglav.tmp_dir(),
         {:ok, pbf_path} <- Geofabrik.download_latest(tmp_dir),
         :ok <- cmd("osmosis", ["--read-pbf", pbf_path, "--write-pgsql" | osmosis_db_params()]),
         {:ok, state} <- Geofabrik.save_state(state) do
      Logger.info("Loaded state: #{state.sequence_number} #{state.timestamp}")
      Logger.info("Done")
      :ok
    end
  end

  @spec update() :: :ok | {:error, term}
  def update() do
    Logger.info("Checking Geofabrik for updates...")

    with {:ok, web_state} <- Geofabrik.web_state(),
         {:ok, local_state} <- Geofabrik.local_state() do
      Logger.info("Local state: #{local_state.sequence_number} #{local_state.timestamp}")
      Logger.info("  Web state: #{web_state.sequence_number} #{web_state.timestamp}")

      if web_state.sequence_number > local_state.sequence_number do
        Logger.info("New data available. Updating.")
        apply_updates(local_state, web_state)
      else
        Logger.info("You already have the latest data")
        :ok
      end
    end
  end

  defp apply_updates(local_state, web_state) do
    range = (local_state.sequence_number + 1)..web_state.sequence_number

    Enum.each(range, fn seq ->
      {:ok, state} = apply_update(seq)
      Logger.info("Updated to ##{state.sequence_number}")
    end)
  end

  defp apply_update(sequence_number) do
    Logger.info("Applying update ##{sequence_number}")

    with {:ok, tmp_dir} <- Triglav.tmp_dir(),
         {:ok, change_path} <- Geofabrik.download_change(sequence_number, tmp_dir),
         {:ok, change_state} <- Geofabrik.get_change_state(sequence_number),
         :ok <- osmosis_apply_update(change_path),
         {:ok, state} <- Geofabrik.save_state(change_state) do
      {:ok, state}
    end
  end

  defp osmosis_apply_update(path) do
    cmd("osmosis", [
      "--read-xml-change",
      "file=#{path}",
      "--write-pgsql-change"
      | osmosis_db_params()
    ])
  end

  def osmosis_db_params do
    params =
      Application.fetch_env!(:triglav, Triglav.Repo)
      |> Keyword.get(:url)
      |> Ecto.Repo.Supervisor.parse_url()

    hostname = Keyword.get(params, :hostname)
    port = Keyword.get(params, :port, 5432)

    [
      host: "#{hostname}:#{port}",
      database: Keyword.get(params, :database),
      user: Keyword.get(params, :username),
      password: Keyword.get(params, :password),
      postgresSchema: @schema
    ]
    |> Enum.map(fn {k, v} -> "#{k}=#{v}" end)
  end

  def cmd(command, args, opts \\ []) do
    Logger.info("Running: " <> Enum.join([command | args], " "))

    case System.cmd(command, args, opts) do
      {_, 0} -> :ok
      _ -> :error
    end
  end
end

M lib/triglav/release.ex => lib/triglav/release.ex +11 -0
@@ 24,6 24,17 @@ defmodule Triglav.Release do
    Triglav.Import.Zet.run(opts)
  end

  def osmosis_init() do
    start_repo()
    Triglav.Import.Osmosis.create_schema()
    Triglav.Import.Osmosis.load_initial()
  end

  def osmosis_update() do
    start_repo()
    Triglav.Import.Osmosis.update()
  end

  defp repos do
    Application.fetch_env!(@app, :ecto_repos)
  end

A lib/triglav/schemas/osm_state.ex => lib/triglav/schemas/osm_state.ex +15 -0
@@ 0,0 1,15 @@
defmodule Triglav.Schemas.OsmState do
  use Ecto.Schema
  import Ecto.Changeset

  schema "osm_state" do
    field :sequence_number, :integer
    field :timestamp, :utc_datetime
    timestamps()
  end

  def changeset(row, params \\ %{}) do
    row
    |> cast(params, [:sequence_number, :timestamp])
  end
end

A priv/osmosis/create_schema.sh => priv/osmosis/create_schema.sh +11 -0
@@ 0,0 1,11 @@
#!/bin/sh
psql -c "DROP SCHEMA IF EXISTS osm CASCADE;"
psql -c "CREATE SCHEMA osm;"

PGOPTIONS="--search_path=osm,public" psql -f pgsnapshot_schema_0.6.sql
PGOPTIONS="--search_path=osm,public" psql -f pgsnapshot_schema_0.6_action.sql
PGOPTIONS="--search_path=osm,public" psql -f pgsnapshot_schema_0.6_changes.sql
PGOPTIONS="--search_path=osm,public" psql -f pgsnapshot_schema_0.6_linestring.sql

# initial
# osmosis --read-pbf /home/ihabunek/projects/ihabunek/triglav_import/croatia-latest.osm.pbf --write-pgsql host=localhost:5434 database=test postgresSchema=osm user=ihabunek password=starseed

A priv/osmosis/pgsnapshot_schema_0.6.sql => priv/osmosis/pgsnapshot_schema_0.6.sql +170 -0
@@ 0,0 1,170 @@
-- Database creation script for the snapshot PostgreSQL schema.

-- Drop all tables if they exist.
DROP TABLE IF EXISTS actions;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS nodes;
DROP TABLE IF EXISTS ways;
DROP TABLE IF EXISTS way_nodes;
DROP TABLE IF EXISTS relations;
DROP TABLE IF EXISTS relation_members;
DROP TABLE IF EXISTS schema_info;

-- Drop all stored procedures if they exist.
DROP FUNCTION IF EXISTS osmosisUpdate();


-- Create a table which will contain a single row defining the current schema version.
CREATE TABLE schema_info (
    version integer NOT NULL
);


-- Create a table for users.
CREATE TABLE users (
    id int NOT NULL,
    name text NOT NULL
);


-- Create a table for nodes.
CREATE TABLE nodes (
    id bigint NOT NULL,
    version int NOT NULL,
    user_id int NOT NULL,
    tstamp timestamp without time zone NOT NULL,
    changeset_id bigint NOT NULL,
    tags hstore
);
-- Add a postgis point column holding the location of the node.
SELECT AddGeometryColumn('nodes', 'geom', 4326, 'POINT', 2);


-- Create a table for ways.
CREATE TABLE ways (
    id bigint NOT NULL,
    version int NOT NULL,
    user_id int NOT NULL,
    tstamp timestamp without time zone NOT NULL,
    changeset_id bigint NOT NULL,
    tags hstore,
    nodes bigint[]
);


-- Create a table for representing way to node relationships.
CREATE TABLE way_nodes (
    way_id bigint NOT NULL,
    node_id bigint NOT NULL,
    sequence_id int NOT NULL
);


-- Create a table for relations.
CREATE TABLE relations (
    id bigint NOT NULL,
    version int NOT NULL,
    user_id int NOT NULL,
    tstamp timestamp without time zone NOT NULL,
    changeset_id bigint NOT NULL,
    tags hstore
);

-- Create a table for representing relation member relationships.
CREATE TABLE relation_members (
    relation_id bigint NOT NULL,
    member_id bigint NOT NULL,
    member_type character(1) NOT NULL,
    member_role text NOT NULL,
    sequence_id int NOT NULL
);


-- Configure the schema version.
INSERT INTO schema_info (version) VALUES (6);


-- Add primary keys to tables.
ALTER TABLE ONLY schema_info ADD CONSTRAINT pk_schema_info PRIMARY KEY (version);

ALTER TABLE ONLY users ADD CONSTRAINT pk_users PRIMARY KEY (id);

ALTER TABLE ONLY nodes ADD CONSTRAINT pk_nodes PRIMARY KEY (id);

ALTER TABLE ONLY ways ADD CONSTRAINT pk_ways PRIMARY KEY (id);

ALTER TABLE ONLY way_nodes ADD CONSTRAINT pk_way_nodes PRIMARY KEY (way_id, sequence_id);

ALTER TABLE ONLY relations ADD CONSTRAINT pk_relations PRIMARY KEY (id);

ALTER TABLE ONLY relation_members ADD CONSTRAINT pk_relation_members PRIMARY KEY (relation_id, sequence_id);


-- Add indexes to tables.
CREATE INDEX idx_nodes_geom ON nodes USING gist (geom);

CREATE INDEX idx_way_nodes_node_id ON way_nodes USING btree (node_id);

CREATE INDEX idx_relation_members_member_id_and_type ON relation_members USING btree (member_id, member_type);


-- Set to cluster nodes by geographical location.
ALTER TABLE ONLY nodes CLUSTER ON idx_nodes_geom;

-- Set to cluster the tables showing relationship by parent ID and sequence
ALTER TABLE ONLY way_nodes CLUSTER ON pk_way_nodes;
ALTER TABLE ONLY relation_members CLUSTER ON pk_relation_members;

-- There are no sensible CLUSTER orders for users or relations.
-- Depending on geometry columns different clustings of ways may be desired.

-- Create the function that provides "unnest" functionality while remaining compatible with 8.3.
CREATE OR REPLACE FUNCTION unnest_bbox_way_nodes() RETURNS void AS $$
DECLARE
	previousId ways.id%TYPE;
	currentId ways.id%TYPE;
	result bigint[];
	wayNodeRow way_nodes%ROWTYPE;
	wayNodes ways.nodes%TYPE;
BEGIN
	FOR wayNodes IN SELECT bw.nodes FROM bbox_ways bw LOOP
		FOR i IN 1 .. array_upper(wayNodes, 1) LOOP
			INSERT INTO bbox_way_nodes (id) VALUES (wayNodes[i]);
		END LOOP;
	END LOOP;
END;
$$ LANGUAGE plpgsql;


-- Create customisable hook function that is called within the replication update transaction.
CREATE FUNCTION osmosisUpdate() RETURNS void AS $$
DECLARE
BEGIN
END;
$$ LANGUAGE plpgsql;

-- Manually set statistics for the way_nodes and relation_members table
-- Postgres gets horrible counts of distinct values by sampling random pages
-- and can be off by an 1-2 orders of magnitude

-- Size of the ways table / size of the way_nodes table
ALTER TABLE way_nodes ALTER COLUMN way_id SET (n_distinct = -0.08);

-- Size of the nodes table / size of the way_nodes table * 0.998
-- 0.998 is a factor for nodes not in ways
ALTER TABLE way_nodes ALTER COLUMN node_id SET (n_distinct = -0.83);

-- API allows a maximum of 2000 nodes/way. Unlikely to impact query plans.
ALTER TABLE way_nodes ALTER COLUMN sequence_id SET (n_distinct = 2000);

-- Size of the relations table / size of the relation_members table
ALTER TABLE relation_members ALTER COLUMN relation_id SET (n_distinct = -0.09);

-- Based on June 2013 data
ALTER TABLE relation_members ALTER COLUMN member_id SET (n_distinct = -0.62);

-- Based on June 2013 data. Unlikely to impact query plans.
ALTER TABLE relation_members ALTER COLUMN member_role SET (n_distinct = 6500);

-- Based on June 2013 data. Unlikely to impact query plans.
ALTER TABLE relation_members ALTER COLUMN sequence_id SET (n_distinct = 10000);

A priv/osmosis/pgsnapshot_schema_0.6_action.sql => priv/osmosis/pgsnapshot_schema_0.6_action.sql +15 -0
@@ 0,0 1,15 @@
-- Add an action table for the purpose of capturing all actions applied to a database.
-- The table is populated during application of a changeset, then osmosisUpdate is called,
-- then the table is cleared all within a single database transaction.
-- The contents of this table can be used to update derivative tables by customising the
-- osmosisUpdate stored procedure.

-- Create a table for actions.
CREATE TABLE actions (
	data_type character(1) NOT NULL,
	action character(1) NOT NULL,
	id bigint NOT NULL
);

-- Add primary key.
ALTER TABLE ONLY actions ADD CONSTRAINT pk_actions PRIMARY KEY (data_type, id);

A priv/osmosis/pgsnapshot_schema_0.6_changes.sql => priv/osmosis/pgsnapshot_schema_0.6_changes.sql +95 -0
@@ 0,0 1,95 @@
DROP TABLE IF EXISTS replication_changes;

-- Create a table for replication changes that are applied to the database.
CREATE TABLE replication_changes (
    id SERIAL,
    tstamp TIMESTAMP without time zone NOT NULL DEFAULT(NOW()),
    nodes_modified INT NOT NULL DEFAULT (0),
    nodes_added INT NOT NULL DEFAULT (0),
    nodes_deleted INT NOT NULL DEFAULT (0),
    ways_modified INT NOT NULL DEFAULT (0),
    ways_added INT NOT NULL DEFAULT (0),
    ways_deleted INT NOT NULL DEFAULT (0),
    relations_modified INT NOT NULL DEFAULT (0),
    relations_added INT NOT NULL DEFAULT (0),
    relations_deleted INT NOT NULL DEFAULT (0),
    changesets_applied BIGINT [] NOT NULL,
    earliest_timestamp TIMESTAMP without time zone NOT NULL,
    latest_timestamp TIMESTAMP without time zone NOT NULL
);

 DROP TABLE IF EXISTS sql_changes;

 CREATE TABLE sql_changes (
  id SERIAL,
  tstamp TIMESTAMP without time zone NOT NULL DEFAULT(NOW()),
  entity_id BIGINT NOT NULL,
  type TEXT NOT NULL,
  changeset_id BIGINT NOT NULL,
  change_time TIMESTAMP NOT NULL,
  action INT NOT NULL,
  query text NOT NULL,
  arguments text
);

DROP TABLE IF EXISTS state;

 CREATE TABLE state (
  id SERIAL,
  tstamp TIMESTAMP without time zone NOT NULL DEFAULT(NOW()),
  sequence_number BIGINT NOT NULL,
  state_timestamp TIMESTAMP WITHOUT time zone NOT NULL,
  disabled BOOLEAN NOT NULL DEFAULT(false)
);

 DROP TABLE IF EXISTS locked;

 CREATE TABLE locked (
  id SERIAL,
  started TIMESTAMP WITHOUT time zone NOT NULL DEFAULT(NOW()),
  process TEXT NOT NULL,
  source TEXT NOT NULL,
  location TEXT NOT NULL,
  write_lock BOOLEAN NOT NULL DEFAULT(false)
);

 DROP FUNCTION IF EXISTS lock_database(TEXT, TEXT, TEXT);
CREATE OR REPLACE FUNCTION lock_database(new_process TEXT, new_source TEXT, new_location TEXT, request_write_lock BOOLEAN) RETURNS INT AS $$
  DECLARE locked_id INT;
  DECLARE current_id INT;
  DECLARE current_process TEXT;
  DECLARE current_source TEXT;
  DECLARE current_location TEXT;
  DECLARE current_write_lock BOOLEAN;
BEGIN

   SELECT id, process, source, location, write_lock
    INTO current_id, current_process, current_source, current_location, current_write_lock
    FROM locked ORDER BY write_lock DESC NULLS LAST LIMIT 1;
  IF (current_process IS NULL OR CHAR_LENGTH(current_process) = 0 OR (request_write_lock = FALSE AND current_write_lock = FALSE)) THEN
    INSERT INTO locked (process, source, location, write_lock) VALUES (new_process, new_source, new_location, request_write_lock) RETURNING id INTO locked_id;
    RETURN locked_id;
  ELSE
    RAISE EXCEPTION 'Database is locked by another id {%}, process {%}, source {%}, location {%}', current_id, current_process, current_source, current_location;
  END IF;
END;
$$ LANGUAGE plpgsql VOLATILE;

 CREATE OR REPLACE FUNCTION unlock_database(locked_id INT) RETURNS BOOLEAN AS $$
  DECLARE response BOOLEAN;
  DECLARE exist_count INT;
BEGIN
  IF (locked_id = -1) THEN
    DELETE FROM locked;
    RETURN true;
  ELSE
    SELECT COUNT(*) INTO exist_count FROM locked WHERE id = locked_id;
    IF (exist_count = 1) THEN
      DELETE FROM locked WHERE id = locked_id;
      RETURN true;
    ELSE
      RETURN false;
    END IF;
  END IF;
END;
$$ LANGUAGE plpgsql VOLATILE;

A priv/osmosis/pgsnapshot_schema_0.6_linestring.sql => priv/osmosis/pgsnapshot_schema_0.6_linestring.sql +8 -0
@@ 0,0 1,8 @@
-- Add a postgis GEOMETRY column to the way table for the purpose of storing the full linestring of the way.
SELECT AddGeometryColumn('ways', 'linestring', 4326, 'GEOMETRY', 2);

-- Add an index to the bbox column.
CREATE INDEX idx_ways_linestring ON ways USING gist (linestring);

-- Cluster table by geographical location.
CLUSTER ways USING idx_ways_linestring;

A priv/repo/migrations/20210110200725_create_osm_state.exs => priv/repo/migrations/20210110200725_create_osm_state.exs +15 -0
@@ 0,0 1,15 @@
defmodule Triglav.Repo.Migrations.CreateOsmState do
  use Ecto.Migration

  def up do
    create table("osm_state") do
      add :sequence_number, :integer, null: false
      add :timestamp, :utc_datetime, null: false
      timestamps()
    end
  end

  def down do
    drop table("osm_state")
  end
end