~jomco/straatnaam

ff0bec873eaf0f87f8f4af5246708aecffa1a2fe — Remco van 't Veer 1 year, 11 months ago 22bb0df
Speedup import

Import the large datasets (NUM, PND, VBO) in parallel with the rest.
1 files changed, 49 insertions(+), 30 deletions(-)

M src/straatnaam/lvbag.clj
M src/straatnaam/lvbag.clj => src/straatnaam/lvbag.clj +49 -30
@@ 16,7 16,8 @@
;; along with this program.  If not, see <https://www.gnu.org/licenses/>.

(ns straatnaam.lvbag
  (:require [clojure.data.xml :as xml]
  (:require [clojure.core.async :as async]
            [clojure.data.xml :as xml]
            [clojure.java.io :as io]
            [clojure.java.jdbc :as sql]
            [clojure.string :as string]


@@ 304,35 305,54 @@
                                (slurp)))
                       "SET SCHEMA 'public'"]))

(defn- -import-from-stream [db sn in]
(def importers
  {"9999NUM" ["nummeraanduiding" parse-nummeraanduiding-xml]
   "9999OPR" ["openbareruimte" parse-openbare-ruimte-xml]
   "9999PND" ["pand" parse-pand-xml]
   "9999VBO" ["verblijfsobject" parse-verblijfsobject-xml]
   "9999LIG" ["ligplaats" parse-ligplaats-xml]
   "9999STA" ["standplaats" parse-standplaats-xml]
   "9999WPL" ["woonplaats" parse-woonplaats-xml]})

(defn- import-type-from-file
  "Import types from file.  Note that only one file per type exists in
  the toplevel zip file so the rest can be skipped (and no more zip
  coding needs to be done) when all assigned types are processed."
  [db sn file types]
  (with-open [in (io/input-stream file)
              zin (ZipInputStream. in)]
    (loop [entry (.getNextEntry zin)
           types types]
      (when (and entry types)
        (let [name (.getName entry)
              type (.substring name 0 7)]
          (if (types type)
            (let [[table parse] (importers type)]
              (log/info "importing from" name)

              (with-xmls-from-zip (ZipInputStream. zin)
                (fn [in name]
                  (log/debug "importing file" name)
                  (insert-records! db sn table (parse in))))

              (log/debug "done importing from" name)
              (recur (.getNextEntry zin) (disj types type)))
            (recur (.getNextEntry zin) types)))))))

(defn- import-from-file [db sn file]
  (prepare-schema! db sn)

  (let [zin (ZipInputStream. in)]
    (loop [entry (.getNextEntry zin)]
      (when entry
        (let [name    (.getName entry)
              type    (.substring name 0 7)
              [table
               parse] ({"9999NUM" ["nummeraanduiding" parse-nummeraanduiding-xml]
                        "9999OPR" ["openbareruimte" parse-openbare-ruimte-xml]
                        "9999PND" ["pand" parse-pand-xml]
                        "9999VBO" ["verblijfsobject" parse-verblijfsobject-xml]
                        "9999LIG" ["ligplaats" parse-ligplaats-xml]
                        "9999STA" ["standplaats" parse-standplaats-xml]
                        "9999WPL" ["woonplaats" parse-woonplaats-xml]}
                       type)]
          (log/debug "entry" name type)

          (when table
            (log/info "importing from" name)
            (with-xmls-from-zip (ZipInputStream. zin)
              (fn [in name]
                (log/debug "importing file" name)
                (insert-records! db sn table (parse in))))))
        (recur (.getNextEntry zin))))

    (insert-links! db sn)
    (insert-bag! db sn)))
  (doseq [c [(async/thread (import-type-from-file db sn file #{"9999NUM"}))
             (async/thread (import-type-from-file db sn file #{"9999PND"}))
             (async/thread (import-type-from-file db sn file #{"9999VBO"}))
             (async/thread (import-type-from-file db sn file #{"9999LIG"
                                                                "9999STA"
                                                                "9999OPR"
                                                                "9999WPL"}))]]
    (async/<!! c))

  (insert-links! db sn)
  (insert-bag! db sn))

(defn import-from-stream
  "Import data from given input stream of zip-file."


@@ 343,8 363,7 @@
      (io/copy in file)

      (log/info (str "importing data into " sn))
      (with-open [in (io/input-stream file)]
        (-import-from-stream db sn in))
      (import-from-file db sn file)

      (finally
        (.delete file)))))