A resources/migrations/20221218100000-add-ok-to-bag_syncs.down.sql => resources/migrations/20221218100000-add-ok-to-bag_syncs.down.sql +2 -0
@@ 0,0 1,2 @@
+ALTER TABLE public.bag_syncs
+ DROP COLUMN ok;
A resources/migrations/20221218100000-add-ok-to-bag_syncs.up.sql => resources/migrations/20221218100000-add-ok-to-bag_syncs.up.sql +8 -0
@@ 0,0 1,8 @@
+ALTER TABLE public.bag_syncs
+ ADD COLUMN ok BOOLEAN;
+--;;
+UPDATE public.bag_syncs
+ SET ok = TRUE;
+--;;
+ALTER TABLE public.bag_syncs
+ ALTER COLUMN ok SET NOT NULL;
M src/straatnaam/core.clj => src/straatnaam/core.clj +20 -8
@@ 33,9 33,14 @@
:pgport "5432"
:pguser (environ/env :user)
:pgpassword nil
- :http-host "localhost"
- :http-port "8080"
- :feed-url "https://service.pdok.nl/kadaster/adressen/atom/v1_0/adressen.xml"})
+
+ :http-host "localhost"
+ :http-port "8080"
+
+ :feed-url "https://service.pdok.nl/kadaster/adressen/atom/v1_0/adressen.xml"
+
+ :retention-success "2"
+ :retention-failure "1"})
(defn get-env
[env k]
@@ 70,13 75,19 @@
:password (get-str env :pgpassword)}]
{:db-spec db-spec
:feed-url (get-str env :feed-url)
+
+ :retention {:success (get-int env :retention-success)
+ :failure (get-int env :retention-failure)}
+
:migratus {:store :database
:migration-dir "migrations"
:db db-spec}
- :jetty {:host (get-str env :http-host)
- :port (get-int env :http-port)
- :join? false}
- :web {:allowed-origins (get-set env :http-allowed-origins)}}))
+
+ :jetty {:host (get-str env :http-host)
+ :port (get-int env :http-port)
+ :join? false}
+
+ :web {:allowed-origins (get-set env :http-allowed-origins)}}))
(defonce server-atom (atom nil))
@@ 102,5 113,6 @@
(let [config (mk-config environ/env)]
(migratus/migrate (:migratus config))
(data/start-updater! (:db-spec config)
- (:feed-url config))
+ (:feed-url config)
+ (:retention config))
(start! config)))
M src/straatnaam/data.clj => src/straatnaam/data.clj +43 -19
@@ 93,19 93,22 @@
(log/info "importing fresh dataset")
(with-open [in (.getInputStream conn)]
(lvbag/import-from-stream db sn in))
- (sql/insert! db "bag_syncs" {"updated" (sql-ts updated)
- "started_at" (sql-ts started-at)
- "finished_at" (sql-ts (Date.))
- "schema" sn})
- (log/info "importing done")
-
- (if (version-ok? db sn)
- (do
- (activate-version! db sn)
- ::success)
- (do
- (log/warn "version rejected" sn)
- ::rejected)))
+
+ (let [ok (version-ok? db sn)]
+ (sql/insert! db "bag_syncs" {"updated" (sql-ts updated)
+ "started_at" (sql-ts started-at)
+ "finished_at" (sql-ts (Date.))
+ "schema" sn
+ "ok" ok})
+ (log/info "importing done")
+
+ (if ok
+ (do
+ (activate-version! db sn)
+ ::success)
+ (do
+ (log/warn "version rejected" sn)
+ ::rejected))))
(do
(log/info "dataset unchanged")
@@ 118,12 121,33 @@
(defonce updater-running (atom false))
+(def prune-select "SELECT schema
+ FROM bag_syncs
+ WHERE ok = ?
+ ORDER BY updated DESC
+ OFFSET ?")
+
+(defn prune
+ [db {:keys [success failure] :or {success 2, failure 1}}]
+ (let [sns (map :schema
+ (concat (sql/query db [prune-select true success])
+ (sql/query db [prune-select false failure])))]
+ (if (seq sns)
+ (sql/with-db-transaction [db db]
+ (doseq [sn sns]
+ (sql/execute! db (str "DROP SCHEMA IF EXISTS " sn " CASCADE"))
+ (sql/execute! db ["DELETE FROM bag_syncs WHERE schema = ?" sn]))
+ (log/info "pruned" (count sns) "version(s)")
+ (log/debug "versions pruned" sns))
+ (log/info "nothing to prune"))))
+
(defn start-updater!
"Start an async updater when not already running."
- [db feed-url]
+ [db feed-url retention]
(when (compare-and-set! updater-running false true)
(log/info "starting updater")
(async/go-loop []
+ (prune db retention)
(sync db feed-url)
(async/<! (async/timeout update-frequency-ms))
(recur))))
@@ 133,9 157,9 @@
[db postcode huisnummer]
(log/debug "lookup" postcode huisnummer)
(sql/query db
- [(str "SELECT *
- FROM bag
- WHERE postcode = ? AND huisnummer = ?
- ORDER BY openbareruimte, huisletter NULLS FIRST,
- huisnummertoevoeging NULLS FIRST")
+ ["SELECT *
+ FROM bag
+ WHERE postcode = ? AND huisnummer = ?
+ ORDER BY openbareruimte, huisletter NULLS FIRST,
+ huisnummertoevoeging NULLS FIRST"
postcode, huisnummer]))
M src/straatnaam/lvbag.clj => src/straatnaam/lvbag.clj +18 -15
@@ 251,6 251,20 @@
table (str sn "." table)]
(insert-rows! db table cols rows tmpl)))
+(def template-tables
+ ["bag"
+ "ligplaats"
+ "ligplaats_neven"
+ "nummeraanduiding"
+ "openbareruimte"
+ "pand"
+ "standplaats"
+ "standplaats_neven"
+ "verblijfsobject"
+ "verblijfsobject_neven"
+ "verblijfsobject_pand"
+ "woonplaats"])
+
(defn- prepare-schema! [db sn]
(log/debug "preparing schema for" sn)
(sql/db-do-commands db true
@@ 258,18 272,7 @@
(str "CREATE SCHEMA " sn "")]
(map #(str "CREATE TABLE " sn "." %
" (LIKE template." % " INCLUDING ALL)")
- ["bag"
- "ligplaats"
- "ligplaats_neven"
- "nummeraanduiding"
- "openbareruimte"
- "pand"
- "standplaats"
- "standplaats_neven"
- "verblijfsobject"
- "verblijfsobject_neven"
- "verblijfsobject_pand"
- "woonplaats"]))))
+ template-tables))))
(defn- insert-links! [db sn]
(log/debug "inserting linking data for" sn)
@@ 354,9 357,9 @@
(async/thread (import-type-from-file db sn file #{"9999PND"}))
(async/thread (import-type-from-file db sn file #{"9999VBO"}))
(async/thread (import-type-from-file db sn file #{"9999LIG"
- "9999STA"
- "9999OPR"
- "9999WPL"}))]]
+ "9999STA"
+ "9999OPR"
+ "9999WPL"}))]]
(async/<!! c))
(insert-links! db sn)