~amirouche/arew

e8db00df7e2deef5d19da2b79d753b5bc24c3a2c — Amirouche 6 months ago f8682dd
wip
4 files changed, 652 insertions(+), 371 deletions(-)

M .dir-locals.el
M letloop.scm
A src/babelialite/db-check.scm
A src/babelialite/db.scm
M .dir-locals.el => .dir-locals.el +2 -1
@@ 11,10 11,11 @@
                           (put 'engine-in-transaction 'scheme-indent-function 2)
			   ;; scheme
			   (put 'switch 'scheme-indent-function 1)
			   (put 'if3 'scheme-indent-function 2)
			   (put 'call-with-input-string 'scheme-indent-function 1)
			   (put 'call-with-values 'scheme-indent-function 1)
			   (put 'search-address-info 'scheme-indent-function 3)
			   (put 'call-with-lock 'scheme-indent-function 1)
			   (put 'with-lock 'scheme-indent-function 1)
			   (put 'call-with-port 'scheme-indent-function 1)
			   (put 'with-cursor 'scheme-indent-function 1)
			   (put 'with-directory 'scheme-indent-function 1)

M letloop.scm => letloop.scm +0 -370
@@ 1,372 1,3 @@
#!chezscheme
(import (chezscheme))


(define (pk . args)
  (write args)
  (newline)
  (car (reverse args)))

(define-syntax define-syntax-rule
  (syntax-rules ()
    ((define-syntax-rule (keyword args ...) body)
     (define-syntax keyword
       (syntax-rules ()
         ((keyword args ...) body))))))

(define (bytevector->pointer bv)
  (#%$object-address bv (+ (foreign-sizeof 'void*) 1)))

;; TODO: take a list of objects, use a macro, possibly with a guard
(define (call-with-lock obj thunk)
  (lock-object obj)
  (call-with-values thunk
    (lambda out
      (unlock-object obj)
      (apply values out))))

;; ffi helpers

(define (make-double-pointer)
  ;; TODO: replace with (make-bytevector 8)
  (foreign-alloc 8))

(define-syntax-rule (dereference  pointer)
  (foreign-ref 'void* pointer 0))

(define-syntax-rule (ftype->pointer ftype)
  (ftype-pointer-address ftype))

(define-syntax-rule (foreign-procedure* return ptr args ...)
  (foreign-procedure __collect_safe ptr (args ...) return))

;; sqlite lsm extension bindings

(define okvslite (load-shared-object "./local/lib/lsm.so"))

(define (error->message code)
  (case code
    ((1) "ERROR")
    ((5) "BUSY")
    ((7) "NO MEMORY")
    ((8) "READ ONLY")
    ((10) "IO ERROR")
    ((11) "CORRUPT")
    ((13) "FULL")
    ((14) "CAN NOT OPEN")
    ((15) "PROTOCOL")
    ((21) "MISUSE")
    ((50) "MISMATCH")
    (else "UNKNOWN ERROR")))

(define-syntax-rule (check sym code)
  (let ((code* code))
    (unless (zero? code*)
      (error 'okvslite (error->message code*) code* sym))))

(define okvslite-new
  (let ((proc (foreign-procedure* int "lsm_new" void* void*)))
    (lambda ()
      (let ((out (make-double-pointer)))
        (check 'okvslite-new (proc 0 out))
        (dereference out)))))

(define okvslite-close
  (let ((proc (foreign-procedure* int "lsm_close" void*)))
    (lambda (db)
      (check 'okvslite-close (proc db)))))

;; TODO: replace with symbols
(define okvslite-config
  (let ((proc (foreign-procedure* int "lsm_config" void* int void*)))
    (lambda (db config value)
      (let ((pointer (make-double-pointer)))
        (foreign-set! 'int pointer 0 value)
        (check 'okvslite-config (proc db config pointer))))))

(define okvslite-open
  (let ((proc (foreign-procedure "lsm_open" (void* string) int)))
    (lambda (db filename)
      (check 'okvslite-open (proc db filename)))))

(define okvslite-begin
  (let ((proc (foreign-procedure* int "lsm_begin" void* int)))
    (lambda (db level)
      (check 'okvslite-begin (proc db level)))))

(define okvslite-commit
  (let ((proc (foreign-procedure* int "lsm_commit" void* int)))
    (lambda (db level)
      (check 'okvslite-commit (proc db level)))))

(define okvslite-rollback
  (let ((proc (foreign-procedure* int "lsm_rollback" void* int)))
    (lambda (db level)
      (check 'okvslite-rollback (proc db level)))))

(define okvslite-insert
  (let ((proc (foreign-procedure* int "lsm_insert" void* void* int void* int)))
    (lambda (db key value)
      (call-with-lock key
        (lambda ()
          (call-with-lock value
            (lambda ()
              (check 'okvslite-insert
                     (proc db
                           (bytevector->pointer key)
                           (bytevector-length key)
                           (bytevector->pointer value)
                           (bytevector-length value))))))))))

(define okvslite-delete
  (let ((proc (foreign-procedure* int "lsm_delete" void* void* int)))
    (lambda (db key)
      (call-with-lock key
        (lambda ()
          (check 'okvslite-delete
                 (proc db
                       (bytevector->pointer key)
                       (bytevector-length key))))))))

(define okvslite-cursor-open
  (let ((proc (foreign-procedure* int "lsm_csr_open" void* void*)))
    (lambda (db)
      (let ((out (make-double-pointer)))
        (check 'okvslite-cursor-open
               (proc db out))
        (dereference out)))))

(define okvslite-cursor-close
  (let ((proc (foreign-procedure* int "lsm_csr_close" void*)))
    (lambda (cursor)
      ;; TODO: XXX: maybe free cursor, followup on the above
      ;; make-double-pointer
      (check 'okvslite-cursor-close (proc cursor)))))

(define (->seek symbol)
  (case symbol
    ((less-than-or-equal-fast) -2)
    ((less-than-or-equal) -1)
    ((equal) 0)
    ((greater-than-or-equal) 1)
    (else (error 'okvslite "unknown seek strategy"))))

(define okvslite-cursor-seek
  (let ((proc (foreign-procedure* int "lsm_csr_seek" void* void* int int)))
    (lambda (cursor key strategy)
      (call-with-lock key
        (lambda ()
          (check 'okvslite-cursor-seek
                 (proc cursor
                       (bytevector->pointer key)
                       (bytevector-length key)
                       (->seek strategy))))))))

(define okvslite-cursor-first
  (let ((proc (foreign-procedure* int "lsm_csr_first" void*)))
    (lambda (cursor)
      (check 'okvslite-cursor-first (proc cursor)))))

(define okvslite-cursor-last
  (let ((proc (foreign-procedure* int "lsm_csr_last" void*)))
    (lambda (cursor)
      (check 'okvslite-cursor-last (proc cursor)))))

(define okvslite-cursor-next
  (let ((proc (foreign-procedure* int "lsm_csr_next" void*)))
    (lambda (cursor)
      (check 'okvslite-cursor-next (proc cursor)))))

(define okvslite-cursor-prev
  (let ((proc (foreign-procedure* int "lsm_csr_prev" void*)))
    (lambda (cursor)
      (check 'okvslite-cursor-prev (proc cursor)))))

(define okvslite-cursor-valid?
  (let ((proc (foreign-procedure* int "lsm_csr_valid" void*)))
    (lambda (cursor)
      (= (proc cursor) 1))))

(define okvslite-cursor-key
  (let ((proc (foreign-procedure* int "lsm_csr_key" void* void* void*)))
    (lambda (cursor)
      (let ((data* (make-double-pointer))
            (length* (make-double-pointer)))
        (check 'okvslite-cursor-key (proc cursor data* length*))
        ;; copy the data into a scheme bytevector
        (let* ((data (dereference data*))
               (length (foreign-ref 'int length* 0))
               (bytevector (make-bytevector length)))
          (let loop ((index (- length 1)))
            (unless (< index 0)
              (let ((value (foreign-ref 'unsigned-8 data index)))
                (bytevector-u8-set! bytevector index value)
                (loop (- index 1)))))
          bytevector)))))

(define okvslite-cursor-value
  (let ((proc (foreign-procedure* int "lsm_csr_value" void* void* void*)))
    (lambda (cursor)
      (let ((data* (make-double-pointer))
            (length* (make-double-pointer)))
        (check 'okvslite-cursor-value (proc cursor data* length*))
        ;; copy the data into a scheme bytevector
        (let* ((data (dereference data*))
               (length (foreign-ref 'int length* 0))
               (bytevector (make-bytevector length)))
          (let loop ((index (- length 1)))
            (unless (< index 0)
              (let ((value (foreign-ref 'unsigned-8 data index)))
                (bytevector-u8-set! bytevector index value)
                (loop (- index 1)))))
          bytevector)))))

(define (okvs-open filename)
  (define okvs (okvslite-new))
  (okvslite-open okvs filename)
  okvs)

(define (failure-default)
  (error 'okvs "transaction failed"))

(define okvs-in-transaction
  (case-lambda
   ((okvs proc)
    (okvs-in-transaction okvs proc failure-default values))
   ((okvs proc failure) (okvs-in-transaction okvs proc failure values))
   ((okvs proc failure success) (okvs-in-transaction% okvs proc failure values))))

(define (okvs-in-transaction% okvs proc failure values)
  (let loop ((try 5) ;; magic number
             (ex #f))
    (if (zero? try)
        (raise ex)
        (guard (ex
                (else
                 (okvslite-rollback okvs 0)
                 (loop (fx- try 1) ex)))
          (okvslite-begin okvs 0)
          (call-with-values (lambda () (proc okvs))
            (lambda args
              (okvslite-commit okvs 0)
              (apply values args)))))))

(define (call-with-cursor okvs proc)
  (define cursor (okvslite-cursor-open okvs))
  (guard (ex (else (okvslite-cursor-close cursor) (raise ex)))
    (call-with-values (lambda ()
                        (proc cursor))
      (lambda args
        (okvslite-cursor-close okvs)
        (apply values args)))))

(define (okvs-ref okvs key)
  (call-with-cursor okvs
    (lambda (cursor)
      (okvslite-cursor-seek cursor key 'equal)
      (if (okvslite-cursor-valid? cursor)
          (okvslite-cursor-value cursor)
          #f))))

(define (compare bytevector other)
  ;; lexicographic comparison

  ;; TODO: add a few fixnum calls
  ;; TODO: add a 3-way-if macro

  ;; If BYTEVECTOR is before OTHER return -1, if equal return 0,
  ;; otherwise if BYTEVECTOR is after OTHER return 1
  (let ((end (min (bytevector-length bytevector)
                  (bytevector-length other))))
    (let loop ((index 0))
      (if (zero? (- end index))
          (if (= (bytevector-length bytevector)
                 (bytevector-length other))
              0
              (if (< (bytevector-length bytevector)
                     (bytevector-length other))
                  -1
                  1))
          (let ((delta (- (bytevector-u8-ref bytevector index)
                          (bytevector-u8-ref other index))))
            (if (zero? delta)
                (loop (+ 1 index))
                (if (negative? delta)
                    -1
                    1)))))))

;; TODO: go through the range in other order!!
(define (generator-range okvs key other)
  (define cursor (okvslite-cursor-open okvs))

  (define (fini!) (okvslite-cursor-close cursor) (set! yield eof-object) (eof-object))

  (define (init)
    ;; go through the range in reverse order
    (okvslite-cursor-seek other 'less-than-or-equal)
    (if (okvslite-cursor-valid? cursor)
        (fini!)
        (let* ((key* (okvslite-cursor-key cursor))
               (shift (compare key* other)))
          (when (fx=? shift 0)
            ;; the end of the range is excluded.
            (okvslite-cursor-prev cursor))
          (if (not (okvslite-cursor-valid? cursor))
              (fini!)
              (let* ((key* (okvslite-cursor-key cursor))
                     (shift (compare key* other)))
                (if (fx=? shift 1 key)
                    (fini!)
                    (begin
                      ;; if shift equals zero, continue will run once.
                      (set! yield continue)
                      key)))))))

  (define (continue)
    ;; go through the range in reverse order
    (okvslite-cursor-prev cursor)
    (if (not (okvslite-cursor-valid? cursor))
        (fini!)
        (let* ((key* (okvslite-cursor-key cursor))
               ;; comparison is done against KEY.
               (shift (compare key key*)))
          (cond
           ((fx=? shift 1) key*)
           ((fx=? shift 0) key*)
           ((fx=? shift -1) (fini!))))))

  (define yield init)

  (lambda ()
    (yield)))


(define (generator->list generator)
  (let loop ((out '()))
    (let ((item (generator)))
      (if (eof-object? item)
          (reverse out)
          (loop (cons item out))))))

(define (generator-map proc generator)
  (lambda ()
    (proc (generator))))

(define (okvs-range okvs key other)
  (define (make-key-value key)
    (cons key (okvs-ref okvs key)))

  (generator->list (generator-map make-key-value (generator-range okvs key other))))

(define (okvs-delete! okvs key)
  (okvslite-delete okvs key))

(define (okvs-remove! okvs key other)
  (define (delete! key)
    (okvs-delete! okvs key))

  (generator->list (generator-map delete! (generator-range okvs key other))))


(define (file-char-generator filename)
  (define port (open-input-file filename))


@@ 382,7 13,6 @@
  (lambda ()
    (yield)))


(define (cleanize char)
  (case char
    ((#\# #\. #\( #\[ #\) #\] #\` #\newline) #\space)

A src/babelialite/db-check.scm => src/babelialite/db-check.scm +184 -0
@@ 0,0 1,184 @@
(library (babelialite db-check)

  (export check-000
          check-001
          check-002
          check-003
          check-004
          check-005
          check-006
          check-007
          check-008
          check-009
          check-010
          )

  (import (chezscheme)
          (babelialite db)
          (check))

  (define (pk . args)
    (write args)
    (newline)
    (car (reverse args)))

  (define mkdtemp
    (foreign-procedure "mkdtemp" (string) string))

  (define (make-temporary-directory prefix)
    (let ((input (string-append prefix "-XXXXXX")))
      (mkdtemp input)))

  (define (call-with-db proc)
    (define directory (make-temporary-directory "/tmp/babelialite-db"))
    (define filename (string-append directory "/db.sqlite"))

    (define db (db-open filename))
    (call-with-values (lambda () (proc db))
      (lambda args
        (db-close db)
        (apply values args))))

  (define check-000
    (check #t (call-with-db (lambda (db) #t))))

  (define check-001
    (check #vu8(42)
           (call-with-db
            (lambda (db)
              (db-set! db #vu8(42) #vu8(42))
              (db-ref db #vu8(42))))))

  (define check-002
    (check #f
           (call-with-db
            (lambda (db)
              (db-ref db #vu8(42))))))

   (define (generator->list generator)
    (let loop ((out '()))
      (let ((item (generator)))
        (if (eof-object? item)
            (reverse out)
            (loop (cons item out))))))

   (define check-003
     (check (list (cons #vu8(20 16) #vu8(2)) (cons #vu8(20 17) #vu8(3)))
            (call-with-db
             (lambda (db)
               ;; given
               (db-set! db #vu8(20 18) #vu8(4))
               (db-set! db #vu8(20 16) #vu8(2))
               (db-set! db #vu8(20 15) #vu8(1))
               (db-set! db #vu8(20 19) #vu8(5))
               (db-set! db #vu8(20 17) #vu8(3))

               ;; then
               (generator->list (db-query db #vu8(20 16) #vu8(20 18)))))))

   (define check-004
     (check (list (cons #vu8(20 16) #vu8(2)) (cons #vu8(20 17 01) #vu8(3)))
            (call-with-db
             (lambda (db)
               (db-set! db #vu8(20 18) #vu8(4))
               (db-set! db #vu8(20 16) #vu8(2))
               (db-set! db #vu8(20 15) #vu8(1))
               (db-set! db #vu8(20 19) #vu8(5))
               ;; #vu8(20 17 01) lexicographically less than #vu8(20 18)
               (db-set! db #vu8(20 17 01) #vu8(3))
               (generator->list
                (db-query db #vu8(20 16) #vu8(20 18)))))))

   (define check-005
     (check
      '((#vu8(01 02) . #vu8(1))
        (#vu8(20 16) . #vu8(2))
        (#vu8(20 16 1) . #vu8(2))
        (#vu8(20 17) . #vu8(3))
        (#vu8(20 17 1) . #vu8(2))
        (#vu8(42 42) . #vu8(5)))
      (call-with-db
       (lambda (db)
         (db-set! db #vu8(20 17 01) #vu8(2))
         (db-set! db #vu8(20 17) #vu8(3))
         (db-set! db #vu8(42 42) #vu8(5))
         (db-set! db #vu8(01 02) #vu8(1))
         (db-set! db #vu8(20 16) #vu8(2))
         (db-set! db #vu8(20 16 01) #vu8(2))
         (generator->list (db-query db #vu8() #vu8(255)))))))

   (define check-006
     (check
      '((#vu8(20 16) . #vu8(2))
        (#vu8(20 16 1) . #vu8(2))
        (#vu8(20 17) . #vu8(3))
        (#vu8(20 17 1) . #vu8(2)))
      (call-with-db
       (lambda (db)
          (db-set! db #vu8(20 17 01) #vu8(2))
          (db-set! db #vu8(20 17) #vu8(3))
          (db-set! db #vu8(42 42) #vu8(5))
          (db-set! db #vu8(01 02) #vu8(1))
          (db-set! db #vu8(20 16) #vu8(2))
          (db-set! db #vu8(20 16 01) #vu8(2))
          (generator->list (db-query db #vu8(20) #vu8(21)))))))

   (define check-007
     (check '((#vu8(20 16 1) . #vu8(2))
              (#vu8(20 17) . #vu8(3)))
            (call-with-db
             (lambda (db)
               (db-set! db #vu8(01 02) #vu8(1))
               (db-set! db #vu8(20 16 01) #vu8(2))
               (db-set! db #vu8(20 16) #vu8(2))
               (db-set! db #vu8(20 17 01) #vu8(2))
               (db-set! db #vu8(20 17) #vu8(3))
               (db-set! db #vu8(42 42) #vu8(5))
               ;; get
               (generator->list (db-query db
                                          #vu8(20)
                                          #vu8(21)
                                          1
                                          2))))))

   (define check-008
     (check '((#vu8(20 17) . #vu8(3))
              (#vu8(20 16 1) . #vu8(2)))
            (call-with-db
             (lambda (db)
               (db-set! db #vu8(01 02) #vu8(1))
               (db-set! db #vu8(20 16 01) #vu8(2))
               (db-set! db #vu8(20 16) #vu8(2))
               (db-set! db #vu8(20 17 01) #vu8(2))
               (db-set! db #vu8(20 17) #vu8(3))
               (db-set! db #vu8(42 42) #vu8(5))
               ;; get
               (generator->list (db-query db
                                          #vu8(21)
                                          #vu8(20)
                                          1
                                          2))))))

   (define check-009
     (check '((#vu8(20 16 01) . #vu8(2))
              (#vu8(20 17 01) . #vu8(2)))
            (call-with-db
             (lambda (db)
               (db-set! db #vu8(20 17 01) #vu8(2))
               (db-set! db #vu8(20 16 01) #vu8(2))
               (generator->list (db-query db
                                          #vu8(20)
                                          #vu8(21)))))))

   (define check-010
     (check '()
            (call-with-db
             (lambda (db)
               (db-set! db #vu8(20 17 01) #vu8(2))
               (db-set! db #vu8(20 16 01) #vu8(2))
               (generator->list (db-query db
                                          #vu8(20)
                                          #vu8(21)
                                          3
                                          #f))))))
   )

A src/babelialite/db.scm => src/babelialite/db.scm +466 -0
@@ 0,0 1,466 @@
#!chezscheme
(library (babelialite db)
  (export db-open
          db-cursor-seek?
          db-cursor-next?
          db-cursor-previous?
          db-in-transaction
          db-call-with-cursor
          db-set!
          db-ref
          db-query
          db-remove!
          db-close)
  (import (chezscheme))

  (define (pk . args)
    (write args)
    (newline)
    (car (reverse args)))

  (define-syntax define-syntax-rule
    (syntax-rules ()
      ((define-syntax-rule (keyword args ...) body)
       (define-syntax keyword
         (syntax-rules ()
           ((keyword args ...) body))))))

  (define-syntax-rule (bytevector->pointer bv)
    (#%$object-address bv (+ (foreign-sizeof 'void*) 1)))

  (define-syntax-rule (with-lock objs body ...)
    (begin
      (for-each lock-object objs)
      (call-with-values (lambda () body ...)
        (lambda args
          (for-each unlock-object objs)
          (apply values args)))))

  ;; ffi helpers

  (define-syntax-rule (make-double-pointer)
    (bytevector->pointer (make-bytevector 8)))

  (define-syntax-rule (dereference  pointer)
    (foreign-ref 'void* pointer 0))

  (define-syntax-rule (ftype->pointer ftype)
    (ftype-pointer-address ftype))

  (define-syntax-rule (foreign-procedure* return ptr args ...)
    (foreign-procedure __collect_safe ptr (args ...) return))

  ;; sqlite lsm extension bindings

  (define okvslite (load-shared-object "./local/lib/lsm.so"))

  (define (error->message code)
    ;; TODO: replace with a switch with fx=?
    (case code
      ((1) "ERROR")
      ((5) "BUSY")
      ((7) "NO MEMORY")
      ((8) "READ ONLY")
      ((10) "IO ERROR")
      ((11) "CORRUPT")
      ((13) "FULL")
      ((14) "CAN NOT OPEN")
      ((15) "PROTOCOL")
      ((21) "MISUSE")
      ((50) "MISMATCH")
      (else "UNKNOWN ERROR")))

  (define-syntax-rule (check caller code)
    (let ((code* code))
      (unless (zero? code*)
        (error 'okvslite (error->message code*) caller code*))))

  (define okvslite-new
    (let ((proc (foreign-procedure* int "lsm_new" void* void*)))
      (lambda ()
        (let ((out (make-double-pointer)))
          (check 'okvslite-new (proc 0 out))
          (dereference out)))))

  (define okvslite-close
    (let ((proc (foreign-procedure* int "lsm_close" void*)))
      (lambda (db)
        (check 'okvslite-close (proc db)))))

  ;; TODO: replace VALUE with a list of symbols
  ;; TOOD: make sure it works
  (define okvslite-config
    (let ((proc (foreign-procedure* int "lsm_config" void* int void*)))
      (lambda (db config value)
        (let ((pointer (make-double-pointer)))
          (foreign-set! 'int pointer 0 value)
          (check 'okvslite-config (proc db config pointer))))))

  (define okvslite-open
    (let ((proc (foreign-procedure "lsm_open" (void* string) int)))
      (lambda (db filename)
        (check 'okvslite-open (proc db filename)))))

  (define okvslite-begin
    (let ((proc (foreign-procedure* int "lsm_begin" void* int)))
      (lambda (db level)
        (check 'okvslite-begin (proc db level)))))

  (define okvslite-commit
    (let ((proc (foreign-procedure* int "lsm_commit" void* int)))
      (lambda (db level)
        (check 'okvslite-commit (proc db level)))))

  (define okvslite-rollback
    (let ((proc (foreign-procedure* int "lsm_rollback" void* int)))
      (lambda (db level)
        (check 'okvslite-rollback (proc db level)))))

  (define okvslite-insert
    (let ((proc (foreign-procedure* int "lsm_insert" void* void* int void* int)))
      (lambda (db key value)
        (with-lock (list key value)
          (check 'okvslite-insert
                 (proc db
                       (bytevector->pointer key)
                       (bytevector-length key)
                       (bytevector->pointer value)
                       (bytevector-length value)))))))

  (define okvslite-delete
    (let ((proc (foreign-procedure* int "lsm_delete" void* void* int)))
      (lambda (db key)
        (with-lock (list key)
          (check 'okvslite-delete
                 (proc db
                       (bytevector->pointer key)
                       (bytevector-length key)))))))

  (define okvslite-cursor-open
    (let ((proc (foreign-procedure* int "lsm_csr_open" void* void*)))
      (lambda (db)
        (let ((out (make-double-pointer)))
          (check 'okvslite-cursor-open
                 (proc db out))
          (dereference out)))))

  (define okvslite-cursor-close
    (let ((proc (foreign-procedure* int "lsm_csr_close" void*)))
      (lambda (cursor)
        (check 'okvslite-cursor-close (proc cursor)))))

  (define okvslite-cursor-seek
    (let ((proc (foreign-procedure* int "lsm_csr_seek" void* void* int int)))
      (lambda (cursor key strategy)

        (define (->seek symbol)
          (case symbol
            ((less-than-or-equal-fast) -2)
            ((less-than-or-equal) -1)
            ((equal) 0)
            ((greater-than-or-equal) 1)
            (else (error 'okvslite "unknown seek strategy"))))

        (with-lock (list key)
          (check 'okvslite-cursor-seek
                 (proc cursor
                       (bytevector->pointer key)
                       (bytevector-length key)
                       (->seek strategy)))))))

  (define okvslite-cursor-first
    (let ((proc (foreign-procedure* int "lsm_csr_first" void*)))
      (lambda (cursor)
        (check 'okvslite-cursor-first (proc cursor)))))

  (define okvslite-cursor-last
    (let ((proc (foreign-procedure* int "lsm_csr_last" void*)))
      (lambda (cursor)
        (check 'okvslite-cursor-last (proc cursor)))))

  (define okvslite-cursor-next
    (let ((proc (foreign-procedure* int "lsm_csr_next" void*)))
      (lambda (cursor)
        (check 'okvslite-cursor-next (proc cursor)))))

  (define okvslite-cursor-prev
    (let ((proc (foreign-procedure* int "lsm_csr_prev" void*)))
      (lambda (cursor)
        (check 'okvslite-cursor-prev (proc cursor)))))

  (define okvslite-cursor-valid?
    (let ((proc (foreign-procedure* int "lsm_csr_valid" void*)))
      (lambda (cursor)
        (fx=? (proc cursor) 1))))

  (define okvslite-cursor-key
    (let ((proc (foreign-procedure* int "lsm_csr_key" void* void* void*)))
      (lambda (cursor)
        ;; TODO: replace with a fixed size bytevector
        (let ((data* (make-double-pointer))
              (length* (make-double-pointer)))
          (check 'okvslite-cursor-key (proc cursor data* length*))
          ;; copy the data into a scheme bytevector
          (let* ((data (dereference data*))
                 (length (foreign-ref 'int length* 0))
                 (bytevector (make-bytevector length)))
            (let loop ((index (- length 1)))
              (unless (< index 0)
                (let ((value (foreign-ref 'unsigned-8 data index)))
                  (bytevector-u8-set! bytevector index value)
                  (loop (- index 1)))))
            bytevector)))))

  (define okvslite-cursor-value
    (let ((proc (foreign-procedure* int "lsm_csr_value" void* void* void*)))
      (lambda (cursor)
        ;; TODO: replace with fixed size bytevector
        (let ((data* (make-double-pointer))
              (length* (make-double-pointer)))
          (check 'okvslite-cursor-value (proc cursor data* length*))
          ;; copy the data into a scheme bytevector
          (let* ((data (dereference data*))
                 (length (foreign-ref 'int length* 0))
                 (bytevector (make-bytevector length)))
            (let loop ((index (- length 1)))
              (unless (< index 0)
                (let ((value (foreign-ref 'unsigned-8 data index)))
                  (bytevector-u8-set! bytevector index value)
                  (loop (- index 1)))))
            bytevector)))))

  (define (db-open filename)
    (define okvs (okvslite-new))
    (okvslite-open okvs filename)
    okvs)

  (define (db-cursor-seek? cursor key strategy)
    (okvslite-cursor-seek cursor key strategy)
    (okvslite-cursor-valid? cursor))

  (define (db-cursor-next? cursor)
    (okvslite-cursor-next cursor)
    (okvslite-cursor-valid? cursor))

  (define (db-cursor-previous? cursor)
    (okvslite-cursor-prev cursor)
    (okvslite-cursor-valid? cursor))

  (define (failure-default)
    (error 'okvs "transaction failed"))

  (define (db-in-transaction% okvs proc failure values)
    (guard (ex
            (else
             (okvslite-rollback okvs 0)
             (failure)))
      (okvslite-begin okvs 0)
      (call-with-values (lambda () (proc okvs))
        (lambda args
          (okvslite-commit okvs 0)
          (apply values args)))))

  (define db-in-transaction
    (case-lambda
     ((okvs proc)
      (db-in-transaction okvs proc failure-default values))
     ((okvs proc failure) (db-in-transaction okvs proc failure values))
     ((okvs proc failure success) (db-in-transaction% okvs proc failure values))))

  (define (db-call-with-cursor okvs proc)
    (define cursor (okvslite-cursor-open okvs))
    (guard (ex (else (okvslite-cursor-close cursor) (raise ex)))
      (call-with-values (lambda ()
                          (proc cursor))
        (lambda args
          (okvslite-cursor-close cursor)
          (apply values args)))))

  (define db-set! okvslite-insert)

  (define (db-ref okvs key)
    (db-call-with-cursor okvs
      (lambda (cursor)
        (okvslite-cursor-seek cursor key 'equal)
        (if (okvslite-cursor-valid? cursor)
            (okvslite-cursor-value cursor)
            #f))))

  (define (compare bytevector other)
    ;; lexicographic comparison

    ;; TODO: The code never needs to have 3 different behaviors it
    ;; only needs to know whether it is smaller or the same or
    ;; bigger. In all use of compare there is only two branches.

    ;; If BYTEVECTOR is before OTHER return -1, if equal return 0,
    ;; otherwise if BYTEVECTOR is after OTHER return 1
    (let ((end (min (bytevector-length bytevector)
                    (bytevector-length other))))
      (let loop ((index 0))
        (if (fx=? (fx- end index) 0)
            (if (= (bytevector-length bytevector)
                   (bytevector-length other))
                'same
                (if (fx< (bytevector-length bytevector)
                         (bytevector-length other))
                    'smaller
                    'bigger))
            (let ((delta (fx- (bytevector-u8-ref bytevector index)
                              (bytevector-u8-ref other index))))
              (if (fx=? delta 0)
                  (loop (+ 1 index))
                  (if (fx<? delta 0)
                      'smaller
                      'bigger)))))))

  (define (db-generator okvs key other offset limit)
    (define cursor (okvslite-cursor-open okvs))
    (define count 0)

    (define (fini!) (okvslite-cursor-close cursor) (set! yield eof-object) (eof-object))

    ;; go through the range in lexicographic order

    (define (init)
      (if (not (db-cursor-seek? cursor key 'greater-than-or-equal))
          (fini!)
          (begin
            (if (or limit offset)
                (begin
                  (set! count (fx+ count 1))
                  (set! yield continue/offset+limit)
                  (if (and offset (fx>? offset 0))
                      (continue/offset+limit)
                      (maybe-continue)))
                (begin
                  (set! yield continue)
                  (maybe-continue))))))

    (define (maybe-continue)
      (let ((key* (okvslite-cursor-key cursor)))
        (case (compare key* other)
          ((smaller) (cons key* (okvslite-cursor-value cursor)))
          (else (fini!)))))

    (define (continue)
      (if (not (db-cursor-next? cursor))
          (fini!)
          (maybe-continue)))

    (define (continue/offset+limit)
      (if (and limit (fx>? count limit))
          (fini!)
          (if (not (db-cursor-next? cursor))
              (fini!)
              (begin
                (set! count (fx+ count 1))
                (if (fx>? count offset)
                    (maybe-continue)
                    (continue/offset+limit))))))

    (define yield init)

    (lambda ()
      (yield)))

  (define (db-reverse-generator okvs key other offset limit)
    (define cursor (okvslite-cursor-open okvs))
    (define count 0)

    (define (fini!) (okvslite-cursor-close cursor) (set! yield eof-object) (eof-object))

    ;; go through the range in lexicographic order

    (define (init2)
      (if (or limit offset)
          (begin
            (if (and offset (fx>=? offset 0))
                (begin
                  (set! yield continue/offset+limit)
                  (continue/offset+limit))
                (maybe-continue)))
            (begin
              (set! yield continue)
              (set! count (fx+ count 1))
              (maybe-continue))))

    (define (init)
      (if (not (db-cursor-seek? cursor other 'less-than-or-equal))
          (fini!)
          (let ((key* (okvslite-cursor-key cursor)))
            (if (eq? (compare key other) 'same)
                (if (not (db-cursor-previous? cursor))
                    (fini!)
                    (init2))
                (init2)))))

    (define (maybe-continue)
      (let ((key* (okvslite-cursor-key cursor)))
        (case (compare key* key)
          ((smaller) (fini!))
          (else (cons key* (okvslite-cursor-value cursor))))))

    (define (continue)
      (if (not (db-cursor-previous? cursor))
          (fini!)
          (maybe-continue)))

    (define (continue/offset+limit)
      (if (and limit (fx=? count limit))
          (fini!)
          (if (not (db-cursor-previous? cursor))
              (fini!)
              (begin
                (set! count (fx+ count 1))
                (maybe-continue)))))

    (define yield init)

    (lambda ()
      (yield)))

  (define db-query
    (case-lambda
     ((okvs key) (db-ref okvs key))
     ((okvs key other) (db-query okvs key other #f #f))
     ((okvs key other offset limit)
      (assert (or (not limit) (fx>? limit 0)))
      (case (compare key other)
        ((smaller) (db-generator okvs key other offset limit))
        (else (db-reverse-generator okvs other key offset limit))))))


  (define (generator->list generator)
    (let loop ((out '()))
      (let ((item (generator)))
        (if (eof-object? item)
            (reverse out)
            (loop (cons item out))))))

  (define (generator-map proc generator)
    (lambda ()
      (proc (generator))))

  (define db-delete! okvslite-delete)

  (define (generator-for-each proc generator)
    (let loop ((item (generator)))
      (unless (eof-object? item)
        (proc item)
        (loop (generator)))))

  (define (db-remove!% okvs key other)
    (define (delete! key+value)
      (db-delete! okvs (car key+value)))

    ;; TODO: replace with call-with-cursor, and avoid db-delete! to
    ;; ease GC.
    (generator-for-each delete! (db-query okvs key other)))

  (define db-remove!
    (case-lambda
     ((okvs key) (db-delete! okvs key))
     ((okvs key other) (db-remove!% okvs key other))))

  (define db-close okvslite-close))