~ihabunek/triglav

ref: 26f2c3b921aea5c0e33d1f7c4320e9e6ceb08ea0 triglav/lib/triglav/import/osmosis.ex -rw-r--r-- 4.4 KiB
26f2c3b9Ivan Habunek Calculate and store derived data for public transport 4 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
defmodule Triglav.Import.Osmosis do
  @moduledoc """
  Imports the latest OSM data for Croatia from Geofabrik

  See:
  https://download.geofabrik.de/europe/croatia.html
  """

  alias Triglav.Import.Geofabrik
  alias Triglav.Repo

  require Logger

  @schema "osmosis"

  # Osmosis schema creation scripts
  # https://wiki.openstreetmap.org/wiki/Osmosis/PostGIS_Setup
  @scripts [
    "priv/osmosis/pgsnapshot_schema_0.6.sql",
    "priv/osmosis/pgsnapshot_schema_0.6_action.sql",
    "priv/osmosis/pgsnapshot_schema_0.6_changes.sql",
    "priv/osmosis/pgsnapshot_schema_0.6_linestring.sql"
  ]

  def create_schema() do
    file_args =
      @scripts
      |> Enum.map(&Application.app_dir(:triglav, &1))
      |> Enum.map(&["--file", &1])
      |> List.flatten()

    psql_args = [
      "--quiet",
      "--single-transaction",
      "--command",
      "DROP SCHEMA IF EXISTS \"#{@schema}\" CASCADE;",
      "--command",
      "CREATE SCHEMA \"#{@schema}\";",
      "--command",
      "SET search_path TO \"#{@schema}\",public;"
      | file_args
    ]

    cmd("psql", psql_args)
  end

  def schema_exists?() do
    result =
      Repo.query!(
        "SELECT * FROM information_schema.schemata WHERE schema_name = $1;",
        [@schema]
      )

    result.num_rows > 0
  end

  def load_initial() do
    with {:ok, state} <- Geofabrik.web_state(),
         {:ok, tmp_dir} <- Triglav.tmp_dir(),
         {:ok, pbf_path} <- Geofabrik.download_latest(tmp_dir),
         :ok <- cmd("osmosis", ["--read-pbf", pbf_path, "--write-pgsql" | osmosis_db_params()]),
         {:ok, state} <- Geofabrik.save_state(state) do
      Logger.info("Loaded state: #{state.sequence_number} #{state.timestamp}")
      Logger.info("Done")
      :ok
    end
  end

  def load_file(pbf_path) do
    Logger.info("Loading: #{pbf_path}")

    with {:ok, state} <- Geofabrik.file_state(pbf_path),
         :ok <- cmd("osmosis", ["--read-pbf", pbf_path, "--write-pgsql" | osmosis_db_params()]),
         {:ok, state} <- Geofabrik.save_state(state) do
      Logger.info("Loaded state: #{state.sequence_number} #{state.timestamp}")
      Logger.info("Done")
      :ok
    end
  end

  @spec update() :: :ok | {:error, term}
  def update() do
    Logger.info("Checking Geofabrik for updates...")

    with {:ok, web_state} <- Geofabrik.web_state(),
         {:ok, local_state} <- Geofabrik.local_state() do
      Logger.info("Local state: #{local_state.sequence_number} #{local_state.timestamp}")
      Logger.info("  Web state: #{web_state.sequence_number} #{web_state.timestamp}")

      if web_state.sequence_number > local_state.sequence_number do
        Logger.info("New data available. Updating.")
        apply_updates(local_state, web_state)
        :ok
      else
        Logger.info("You already have the latest data")
        :ok
      end
    end
  end

  defp apply_updates(local_state, web_state) do
    range = (local_state.sequence_number + 1)..web_state.sequence_number

    Enum.each(range, fn seq ->
      {:ok, state} = apply_update(seq)
      Logger.info("Updated to ##{state.sequence_number}")
    end)
  end

  defp apply_update(sequence_number) do
    Logger.info("Applying update ##{sequence_number}")

    with {:ok, tmp_dir} <- Triglav.tmp_dir(),
         {:ok, change_path} <- Geofabrik.download_change(sequence_number, tmp_dir),
         {:ok, change_state} <- Geofabrik.get_change_state(sequence_number),
         :ok <- osmosis_apply_update(change_path),
         {:ok, state} <- Geofabrik.save_state(change_state) do
      {:ok, state}
    end
  end

  defp osmosis_apply_update(path) do
    cmd("osmosis", [
      "--read-xml-change",
      "file=#{path}",
      "--write-pgsql-change"
      | osmosis_db_params()
    ])
  end

  def osmosis_db_params do
    params =
      Application.fetch_env!(:triglav, Triglav.Repo)
      |> Keyword.get(:url)
      |> Ecto.Repo.Supervisor.parse_url()

    hostname = Keyword.get(params, :hostname)
    port = Keyword.get(params, :port, 5432)

    [
      host: "#{hostname}:#{port}",
      database: Keyword.get(params, :database),
      user: Keyword.get(params, :username),
      password: Keyword.get(params, :password),
      postgresSchema: @schema
    ]
    |> Enum.map(fn {k, v} -> "#{k}=#{v}" end)
  end

  def cmd(command, args, opts \\ []) do
    Logger.info("Running: " <> Enum.join([command | args], " "))

    case System.cmd(command, args, opts) do
      {_, 0} -> :ok
      _ -> :error
    end
  end
end