~ehmry/eris_actor

d2d1f11b0d49375064c2a6168c14517e201671e7 — Emery Hemingway 8 months ago trunk
Initial commit
A  => .envrc +2 -0
@@ 1,2 @@
source_env ..
use nix

A  => Tupfile +3 -0
@@ 1,3 @@
include_rules
: |> !nim_lk |> lock.json
: lock.json |> !nim_cfg |> | ./<lock>

A  => Tuprules.tup +7 -0
@@ 1,7 @@
include ../taps/depends.tup
include ../syndicate-nim/depends.tup
include ../eris-nim/depends.tup
NIM_FLAGS += --path:$(TUP_CWD)/../taps/src
NIM_FLAGS += --path:$(TUP_CWD)/../syndicate-nim/src
NIM_FLAGS += --path:$(TUP_CWD)/../eris-nim/src
NIM_GROUPS += $(TUP_CWD)/<lock>

A  => eris_actor.nimble +6 -0
@@ 1,6 @@
bin = @["eris_actor"]
license = "Unlicense"
srcDir = "src"
version = "20230707"

requires "https://codeberg.org/eris/nim-eris.git", "syndicate#b209548f5d15f7391c08fcaec3615ed843f8a410"

A  => shell.nix +2 -0
@@ 1,2 @@
{ pkgs ? import <nixpkgs> { } }:
pkgs.buildNimPackage { name = "dummy"; }

A  => src/eris_actor.nim +39 -0
@@ 1,39 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense

import std/[asyncdispatch, sequtils, tables, uri]
import preserves, syndicate
import eris, eris/[composite_stores, url_stores]
# import ./eris_actor/memorize_recall
import ./eris_actor/syndicate_stores

proc connectStores(strings: seq[string]): Future[ErisStore] =
  let urls = map(strings, parseUri)
  case urls.len
  of 0:
    result = newFuture[ErisStore]("connectStores")
    complete(result, newDiscardStore())
  of 1:
    result = newStoreClient(urls[0])
  else:
    let
      finalFut = newFuture[ErisStore]("connectStores")
      compFut = map(urls, newStoreClient).all
    addCallback(compFut) do ():
      complete(finalFut, newMultiStore(read compFut))
    result = finalFut

type
  Args {.preservesDictionary.} = object
    dataspace: Ref
    stores: seq[string]

runActor("eris_actor") do (root: Ref; turn: var Turn):
  connectStdio(root, turn)
  during(turn, root, ?Args) do (ds: Ref, urls: seq[string]):
    var store: ErisStore
    connectStores(urls).addCallback(turn) do (turn: var Turn; es: ErisStore):
      store = es
      # spawnMemorizer(turn, ds, store)
  do:
    if not store.isNil: close(store)

A  => src/eris_actor/memorize_recall.nim +96 -0
@@ 1,96 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense

import std/[algorithm, asyncdispatch, sequtils, streams, tables, uri]
import preserves, syndicate, syndicate/[actors, bags]
import eris, eris/[composite_stores, url_stores]
import ./recall

type Crystalizer {.final.} = ref object of Entity
  store: ErisStore
  handleMap: Table[Handle, Assertion]

method publish(cry: Crystalizer; turn: var Turn; a: AssertionRef; h: Handle) =
  # TODO: attenuate out the observations
  if a.value.isRecord("Observe", 2):
    cry.handleMap[h] = Assertion()
  else:
    cry.handleMap[h] = a.value

method retract(cry: Crystalizer; turn: var Turn; h: Handle) =
  cry.handleMap.del(h)

method sync(cry: Crystalizer; turn: var Turn; peer: Ref) =
  let
    ingester = newErisIngest(cry.store, chunk1k, convergentMode)
    stream = newStringStream()
  var assertions = newSeqOfCap[Assertion](cry.handleMap.len+1)
  assertions.setLen(1) # put a false value at the front
  for a in cry.handleMap.values:
    if not a.isFalse:
      add(assertions, a)
  sort(assertions)
  var i = 1
  while i < assertions.len:
    if assertions[i] != assertions[i-1]: # dedup
      write(stream, assertions[i])
      waitFor append(ingester, stream)
      stream.setPosition(0)
      stream.data.setLen(0)
  let cap = waitFor cap(ingester)
  message(turn, peer, initRecord("eris", cap.bytes.toPreserve(Ref)))

proc newCrystalizer(store: ErisStore): Crystalizer =
  Crystalizer(store: store)

proc stop(cry: Crystalizer) =
  if not cry.isNil:
    close(cry.store)
    clear(cry.handleMap)

proc connectStores(strings: seq[string]): Future[ErisStore] =
  let urls = map(strings, parseUri)
  case urls.len
  of 0:
    result = newFuture[ErisStore]("connectStores")
    complete(result, newDiscardStore())
  of 1:
    result = newStoreClient(urls[0])
  else:
    let
      finalFut = newFuture[ErisStore]("connectStores")
      compFut = map(urls, newStoreClient).all
    addCallback(compFut) do ():
      complete(finalFut, newMultiStore(read compFut))
    result = finalFut

proc recall(facet: Facet; ds: Ref; stream: ErisStream) {.async.} =
  var
    blk = newSeq[byte](stream.cap.chunkSize.int)
    dec = newBufferedDecoder(0)
  while not stream.atEnd:
    let n = await readBuffer(stream, addr blk[0], blk.len)
    assert n < blk.len
    feed(dec, addr blk[0], n)
    run(facet) do (turn: var Turn):
      while true:
        var (ok, pr) = decode(dec, Ref)
        if ok: discard publish(turn, ds, pr)
        else: break

proc spawnMemorizer*(turn: var Turn; ds: Ref; store: ErisStore) =
  let pat = ?recall.Memorize
  during(turn, ds, pat) do (cap: Ref):
    let cry = newCrystalizer(store)
    discard publish(turn, cap, Observe(pattern: grab(), observer: newRef(turn, cry)))
  do:
    stop(cry)

  during(turn, ds, ?Recall[Ref]) do (erisBytes: seq[byte], subject: Ref):
    let
      cap = parseCap erisBytes
      stream = newErisStream(store, cap)
      fut = recall(turn.facet, subject, stream)
    addCallback(fut) do ():
      close(stream)
      read(fut)

A  => src/eris_actor/recall.nim +21 -0
@@ 1,21 @@

import
  preserves

type
  Recall* {.preservesRecord: "recall".} = object
    `field0`*: ErisBytes
    `subject`* {.preservesEmbedded.}: Preserve[void]

  ErisBytes* {.preservesRecord: "eris".} = object
    `field0`*: seq[byte]

  Memorize* {.preservesRecord: "memorize".} = object
    `field0`* {.preservesLiteral: "eris".}: tuple[]
    `object`* {.preservesEmbedded.}: Preserve[void]

proc `$`*(x: Recall | ErisBytes | Memorize): string =
  `$`(toPreserve(x))

proc encode*(x: Recall | ErisBytes | Memorize): seq[byte] =
  encode(toPreserve(x))

A  => src/eris_actor/store_protocol.nim +26 -0
@@ 1,26 @@

import
  preserves

type
  ErisCapability* {.preservesRecord: "eris".} = object
    `bytes`*: seq[byte]

  `ChunkSize`* {.preservesOr, pure.} = enum
    `a`, `f`
  ErisCache* {.preservesRecord: "eris-cache".} = object
    `chunkSize`*: ChunkSize
    `reference`*: seq[byte]

  `SecretMode`* {.preservesOr, pure.} = enum
    `convergent`, `unique`
  ErisChunk* {.preservesRecord: "eris-chunk".} = object
    `chunkSize`*: ChunkSize
    `reference`*: seq[byte]
    `content`*: seq[byte]

proc `$`*(x: ErisCapability | ErisCache | ErisChunk): string =
  `$`(toPreserve(x))

proc encode*(x: ErisCapability | ErisCache | ErisChunk): seq[byte] =
  encode(toPreserve(x))

A  => src/eris_actor/syndicate_stores.nim +133 -0
@@ 1,133 @@
# SPDX-FileCopyrightText: ☭ Emery Hemingway
# SPDX-License-Identifier: Unlicense

import std/asyncfutures
from std/sequtils import toSeq
import eris
import preserves, syndicate, syndicate/patterns
from syndicate/actors import preventInertCheck
from syndicate/protocols/dataspace import Observe

type
  ErisCache {.preservesRecord: "eris-cache".} = object
    chunkSize: ChunkSize
    reference: Reference

  ErisChunk {.preservesRecord: "eris-chunk".} = object
    chunkSize: ChunkSize
    reference: Reference
    content: seq[byte]

proc toPreserveHook*(bs: eris.ChunkSize; E: typedesc): Preserve[E] =
  case bs
  of chunk1k: "a".toSymbol(E)
  of chunk32k: "f".toSymbol(E)

proc fromPreserveHook*[E](bs: var eris.ChunkSize; pr: Preserve[E]): bool =
  if pr.isSymbol "a":
    bs = chunk1k
    result = true
  elif pr.isSymbol "f":
    bs = chunk32k
    result = true
  assert result, $pr

proc fromPreserveHook*[E](v: var Operations; pr: Preserve[E]): bool =
  if pr.isSet:
    result = true
    for pe in pr.set:
      if pe.isSymbol "Get":
        v.incl Get
      elif pe.isSymbol "Put":
        v.incl Put

proc toPreserveHook*(r: Reference; E: typedesc): Preserve[E] =
  ## Hook for preserving `Reference`.
  r.bytes.toPreserve(E)

proc fromPreserveHook*[E](v: var Reference; pr: Preserve[E]): bool =
  if pr.kind == pkByteString and pr.bytes.len == v.bytes.len:
    copyMem(addr v.bytes[0], unsafeAddr pr.bytes[0], v.bytes.len)
    result = true

type SyndicateStore* {.final.} = ref object of ErisStoreObj
  facet: Facet
  ds: Ref
  disarm: proc() {.gcsafe.}

proc run(store: SyndicateStore, action: TurnAction) =
  ## Run an action in a new facet.
  store.facet.run do (turn: var Turn):
    discard inFacet(turn, action)

method get(store: SyndicateStore; futGet: FutureGet) =
  store.run do (turn: var Turn):
    onPublish(turn, store.ds, ErisChunk ? { 0: ?futGet.chunkSize, 1: ?futGet.`ref`, 2: grab() }) do (blk: seq[byte]):
      complete(futGet, blk)
      #stop(turn)
      # TODO: stop watching for this block

method hasBlock(store: SyndicateStore; blkRef: Reference; bs: ChunkSize): Future[bool] =
  let fut = newFuture[bool]("SyndicateStore.hasBlock")
  store.run do (turn: var Turn):
    # TODO: return false on timeout
    onPublish(turn, store.ds, ErisCache ? { 0: ?bs, 1: ?blkRef }) do:
      fut.complete(true)
      #stop(turn)
      # TODO: stop after observation
  fut

method put(store: SyndicateStore; futPut: FuturePut) =
  store.run do (turn: var Turn):
    let chunkHandle = publish(turn, store.ds,
        ErisChunk(chunkSize: futPut.chunkSize, reference: futPut.`ref`, content: futPut.toBytes))
    onPublish(turn, store.ds, ?ErisCache(chunkSize: futPut.chunkSize, reference: futPut.`ref`)):
      retract(turn, chunkHandle)
      complete(futPut)
      # stop(turn) # TODO: stop observing

method close(store: SyndicateStore) =
  store.disarm()

proc newSyndicateStore*(turn: var Turn; ds: Ref; ops: Operations): SyndicateStore =
  var store = SyndicateStore(ds: ds)
  store.facet = turn.inFacet do (turn: var Turn):
    store.disarm = turn.facet.preventInertCheck()
  store

proc addCallback*(fut: FutureBlock; turn: var Turn; act: TurnAction) =
  let facet = turn.facet
  addCallback(fut) do:
    run(facet, act)

proc spawnErisStore*(turn: var Turn; ds: Ref; store: ErisStore; ops = {Get,Put}): Actor {.discardable.} =
  spawn("ErisStore", turn) do (turn: var Turn):
    let
      chunkRequest = (Observe ? { 0: ErisChunk ? { 0: grabLit(), 1: grabLit() } })
      cacheRequest = (Observe ? { 0: ErisCache ? { 0: grabLit(), 1: grabLit() } })
    doAssert $chunkRequest == "<rec Observe [<rec eris-chunk [<rec lit [<bind <_>>]> <rec lit [<bind <_>>]> <_>]> <_>]>", $chunkRequest

    if Get in ops:
      during(turn, ds, chunkRequest) do (bs: ChunkSize; blkRef: Reference):
        var futGet = newFutureGet(blkRef, bs)
        addCallback(futGet, turn) do (turn: var Turn):
          if not futGet.failed:
            discard publish(turn, ds,
              ErisChunk(chunkSize: futGet.chunkSize, reference: futGet.`ref`, content: futGet.moveBytes))
        get(store, futGet)

    if Put in ops:
      during(turn, ds, cacheRequest) do (bs: ChunkSize; blkRef: Reference):
        let fut = store.hasBlock(blkRef, bs)
        addCallback(fut, turn) do (turn: var Turn):
          let hasBlock = fut.read
          if hasBlock:
            discard publish(turn, ds, ErisCache(chunkSize: bs, reference: blkRef))
          else:
            var pat = ErisChunk ? { 0: ?bs, 1: ?blkRef, 2: grab() }
            onPublish(turn, ds, pat) do (blkBuf: seq[byte]):
              var futPut = newFuturePut(blkBuf)
              if futPut.`ref` == blkRef:
                addCallback(futPut, turn) do (turn: var Turn):
                  discard publish(turn, ds, ErisCache(chunkSize: futPut.chunkSize, reference: futPut.`ref`))
                put(store, futPut)