~amirouche/ruse-babeltoy

dcae609bb4523d06962547a776307832ed30d036 — Amirouche 1 year, 29 days ago bf72433
wip
5 files changed, 96 insertions(+), 54 deletions(-)

M babeltoy.scm
A src/file.scm
M src/foundationdb.scm
M src/foundationdb/sync.scm
M src/index.scm
M babeltoy.scm => babeltoy.scm +29 -35
@@ 6,7 6,8 @@
(import (warc))
(import (foundationdb sync))
(import (index))
;;(import (pool))
(import (file))
(import (pool))


(define db #f)


@@ 28,17 29,6 @@
        (loop (fx- index 1))))
    uid))

(define (call-with-binary-input-port filename proc)
  (define port (open-file-input-port filename))
  (call-with-values (lambda () (proc port))
    (lambda args
      (close-port port)
      (apply values args))))

(define (binary-port->byte-generator port)
  (lambda ()
    (get-u8 port)))

(define (read-query port)
  (define (f port)
    (define char (read-char port))


@@ 241,18 231,20 @@

(define (index! tx url sxml tokens)
  ;; TODO: Skip if URL contains a query string
  (define uid (pk (random-uid)))
  (define uid (random-uid))

  (define ignored (for-each (lambda (token) (index-document-frequency! tx token)) tokens))

  (define (add! token)
    (pk 'adding...)
    (pk 'add! url token)
    (index-backward-add! tx token uid))

  (pk (index-forward-set! tx uid url sxml))
  (pk 'foward url)
  (index-forward-set! tx uid url sxml)

  (for-each add! tokens)
  (pk 'wip uid))
  (for-each add! tokens))

(define pool (make-pool 5))

(define (string-contains? string char)
  (let loop ((index (string-length string)))


@@ 266,35 258,37 @@
(define (index-prepare-warc! filepath)

  (define (p uri+body)
    (pk 'fuuu (car uri+body))
    (if (string-contains? (car uri+body) #\?)
        #f
        (let* ((sxml (html->sxml (cadr uri+body)))
               (tokens (string->tokens (sxml->human-readable-text sxml))))
          (pk (string-length (cadr uri+body)))
          (display ".")(flush-output-port)
          (pk 'out (fdb-in-transaction db
                                       (lambda (tx)
                                         (index! tx
                                                 (car uri+body)
                                                 sxml
                                                 tokens)))))))
          (fdb-in-transaction db
                              (lambda (tx)
                                (index! tx
                                        (car uri+body)
                                        sxml
                                        tokens))))))

  (define (f _)
    (display "o")(flush-output-port))

  ;; (define pool (make-pool 4))

  (pk 'warc-at filepath)
  ;; (pk 'warc-at filepath)

  ;; (define (gpk generator)
  ;;   (lambda ()
  ;;     (let ((out (generator)))
  ;;       (pk (integer->char out))
  ;;       out)))
  
  (call-with-binary-input-port filepath
    (lambda (port)
      (define warc (warc-generator (binary-port->byte-generator port)))
      (pk 'entering (warc))
      (let loop ((item (warc)))
        (unless (eof-object? item)
          (pk item)
          (p item)
          (loop (warc)))))))
  ;; (pool-generator-for-each-parallel-map pool f p (warc-generator (generator-file filepath))))
  (define warc (warc-generator (generator-file filepath)))
  (time (let loop ((item (warc)))
          (unless (eof-object? item)
            (p item)
            (loop (warc))))))

(define (index-prepare-on-disk-html! filepath)
  (let ((sxml (call-with-input-file filepath html->sxml)))

A src/file.scm => src/file.scm +54 -0
@@ 0,0 1,54 @@
#!chezscheme
(library (file)
  (export generator-file)
  (import (chezscheme))

  (define stdlib (load-shared-object #f))

  (define file-open
    (let ((proc (foreign-procedure "open" (string int) int)))
      (lambda (pathname flag)
        (proc pathname flag))))

  (define (bytevector->pointer bv)
    (#%$object-address bv (+ (foreign-sizeof 'void*) 1)))

  (define file-read
    (let ((proc (foreign-procedure "read" (int void* size_t) ssize_t)))
      (lambda (fd bv size)
        (lock-object bv)
        (let ((out (proc fd (bytevector->pointer bv) size)))
          (unlock-object bv)
          out))))

  (define file-close
    (let ((proc (foreign-procedure __collect_safe
                                   "read" (int) int)))
      (lambda (fd)
        (proc fd))))

  (define (generator-file filename)
    (define fd (file-open filename 0))
    (define bv (make-bytevector 4096))
    (define count #f)
    (define index #f)

    (define (read-bytevector)
      (set! count (file-read fd bv 4096))
      (if (= count 0)
          (begin (file-close fd) (eof-object))
          (begin
            (set! index 0)
            (set! continue yield)
            (bytevector-u8-ref bv 0))))

    (define (yield)
      (set! index (fx+ index 1))
      (if (or (fx=? index 4096) (fx=? index count))
          (read-bytevector)
          (bytevector-u8-ref bv index)))

    (define continue read-bytevector)

    (lambda ()
      (continue))))

M src/foundationdb.scm => src/foundationdb.scm +1 -1
@@ 129,7 129,7 @@
          (check (func)))))

    (define fdb-run-network
      (let ((func (foreign-procedure* int "fdb_run_network")))
      (let ((func (foreign-procedure __collect_safe "fdb_run_network" () int)))
        (lambda ()
          (check (func)))))


M src/foundationdb/sync.scm => src/foundationdb/sync.scm +11 -17
@@ 63,7 63,8 @@
      (fdb-select-api-version 630)
      ;; setup network thread
      (fdb-setup-network)
      (set! %network-thread (fork-thread (lambda () (fdb-run-network)))))
      (set! %network-thread
            (fork-thread (lambda () (fdb-run-network)))))

    (define (make-fdb)
      (%make-fdb (fdb-create-database #f)))


@@ 79,50 80,43 @@
      (car (reverse args)))

    (define (fdb-future-block-until-ready* future)

      
      (define on-ready 
        (lambda (condition self)
          (condition-signal condition)
          (unlock-object self)))
      (pk 'ready?)
      (unless (pk 'ready (fdb-future-ready? future))
      
      (unless (fdb-future-ready? future)
        (let ((mutex (make-mutex))
              (condition (make-condition)))
          (letrec* ((code (foreign-callable (lambda (a b) (on-ready condition code)) (void* void*) void)))
            (lock-object code)
            (fdb-future-set-callback future (foreign-callable-entry-point code))
            (mutex-acquire mutex)
            (condition-wait condition mutex)
            (mutex-release mutex)
            (fdb-future-set-callback (foreign-callable-entry-point code))))))
            (mutex-release mutex)))))
          
    (define (fdb-transaction-commit* transaction)
      (let ((future (fdb-transaction-commit (transaction-pointer transaction))))
        (pk 'bloking-commit)
        (fdb-future-block-until-ready* future)
        (pk 'unblock)
        (let ((error (fdb-future-get-error future)))
          (pk 'error error)
          (fdb-future-destroy future)
          (fdb-transaction-destroy (transaction-pointer transaction))
          (pk 'error2 error))))
          error)))

    (define (fdb-transaction-rollback transaction)
      (fdb-transaction-cancel (transaction-pointer transaction))
      (fdb-transaction-destroy (transaction-pointer transaction)))

    (define (fdb-in-transaction fdb proc)
      (let ((tx (pk (fdb-transaction-begin fdb))))
      (let ((tx (fdb-transaction-begin fdb)))
        (let loop ()
          (pk 'loop)
          (call-with-values (lambda () (proc tx))
            (lambda out
              (pk 'commiting)
              (let ((error (fdb-transaction-commit* tx)))
                (pk 'error 'again error)
                (if (pk (fxzero? error))
                    (apply values (pk out))
                (if (fxzero? error)
                    (apply values out)
                    (let ((future (fdb-transaction-on-error (transaction-pointer tx) error)))
                      (pk 'block)
                      (fdb-future-block-until-ready* future)
                      (let ((error (fdb-future-get-error future)))
                        (fdb-future-destroy future)

M src/index.scm => src/index.scm +1 -1
@@ 25,7 25,7 @@

  (define (index-forward-set! db uid url sxml)
    (define s (pack url (scm->bytevector sxml)))
    (pk (bytevector-length s))
    (bytevector-length s)
    (fdb-set! db (pack *forward* uid) s))

  (define (index-forward-ref db uid)