~gabe/mortar unlisted

f1c6e8e6c5706348b438f77e3edd19f6334cb8ce — Gabe Fierro 1 year, 6 months ago 3a5d0ce
Improve client context handling to be robust to disconnections + handle

cleanup a lot more consistently. Also should address a problem where we
wouldn't consume the whole BTrDB response channel if the client left
M go.mod => go.mod +12 -10
@@ 5,18 5,20 @@ require (
	dmitri.shuralyov.com/app/changes v0.0.0-20181114035150-5af16e21babb // indirect
	dmitri.shuralyov.com/service/change v0.0.0-20190203163610-217368fe4577 // indirect
	git.apache.org/thrift.git v0.12.0 // indirect
	git.sr.ht/~gabe/hod v0.6.7
	git.sr.ht/~gabe/hod v0.6.10
	github.com/Shopify/sarama v1.20.1 // indirect
	github.com/aws/aws-sdk-go v1.16.19
	github.com/chzyer/logex v1.1.10 // indirect
	github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
	github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 // indirect
	github.com/coreos/etcd v3.3.12+incompatible // indirect
	github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
	github.com/dgrijalva/jwt-go v3.2.0+incompatible
	github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
	github.com/glycerine/goconvey v0.0.0-20190315024820-982ee783a72e // indirect
	github.com/go-logfmt/logfmt v0.4.0 // indirect
	github.com/gogo/protobuf v1.2.1 // indirect
	github.com/golang/lint v0.0.0-20181217174547-8f45f776aaf1 // indirect
	github.com/golang/protobuf v1.3.1
	github.com/golang/snappy v0.0.0-20190218232222-2a8bb927dd31 // indirect
	github.com/gomodule/redigo v2.0.0+incompatible // indirect
	github.com/google/pprof v0.0.0-20190208070709-b421f19a5c07 // indirect
	github.com/googleapis/gax-go v2.0.2+incompatible // indirect


@@ 31,11 33,12 @@ require (
	github.com/influxdata/influxdb v1.7.5
	github.com/influxdata/platform v0.0.0-20190117200541-d500d3cf5589 // indirect
	github.com/kisielk/errcheck v1.2.0 // indirect
	github.com/kisielk/gotool v1.0.0 // indirect
	github.com/microcosm-cc/bluemonday v1.0.2 // indirect
	github.com/openzipkin/zipkin-go v0.1.5 // indirect
	github.com/pborman/uuid v1.2.0
	github.com/pkg/errors v0.8.1
	github.com/pkg/profile v1.2.1
	github.com/pkg/profile v1.3.0
	github.com/prometheus/client_golang v0.9.2
	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
	github.com/prometheus/common v0.2.0 // indirect


@@ 55,11 58,9 @@ require (
	github.com/shurcooL/reactions v0.0.0-20181222204718-145cd5e7f3d1 // indirect
	github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
	github.com/shurcooL/webdavfs v0.0.0-20181215192745-5988b2d638f6 // indirect
	github.com/sirupsen/logrus v1.3.0
	github.com/sirupsen/logrus v1.4.1
	github.com/spf13/afero v1.2.1 // indirect
	github.com/spf13/viper v1.3.1
	github.com/stretchr/testify v1.3.0 // indirect
	github.com/tinylib/msgp v1.1.0 // indirect
	github.com/spf13/viper v1.3.2
	github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43 // indirect
	go4.org v0.0.0-20190218023631-ce4c26f7be8e // indirect
	golang.org/x/build v0.0.0-20190215225244-0261b66eb045 // indirect


@@ 68,8 69,9 @@ require (
	golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
	golang.org/x/oauth2 v0.0.0-20190212230446-3e8b2be13635 // indirect
	golang.org/x/perf v0.0.0-20190124201629-844a5f5b46f4 // indirect
	google.golang.org/genproto v0.0.0-20190215211957-bd968387e4aa // indirect
	google.golang.org/grpc v1.19.0
	golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67 // indirect
	golang.org/x/tools v0.0.0-20190407030857-0fdf0c73855b // indirect
	google.golang.org/grpc v1.20.1
	gopkg.in/BTrDB/btrdb.v4 v4.15.3 // indirect
	gopkg.in/btrdb.v4 v4.15.3
	honnef.co/go/tools v0.0.0-20190215041234-466a0476246c // indirect

M go.sum => go.sum +27 -0
@@ 18,8 18,12 @@ git.sr.ht/~gabe/hod v0.6.6 h1:Ysi74NW3MahGYMs7085FAwJCrZ5rQUqZhZR3nux9BxQ=
git.sr.ht/~gabe/hod v0.6.6/go.mod h1:+SvVr7UPywUK0lH80Gf/uYnSwY3U6GI33J7Gqd9lfDE=
git.sr.ht/~gabe/hod v0.6.7 h1:I1kOUhGpF1jKp4msATsj6qxOXYxxhsH7kepitnpzjAg=
git.sr.ht/~gabe/hod v0.6.7/go.mod h1:+SvVr7UPywUK0lH80Gf/uYnSwY3U6GI33J7Gqd9lfDE=
git.sr.ht/~gabe/hod v0.6.10 h1:zbcsUcgT5Jl7fjNS2V2zTNYyJ4ZmBlWboiWCEpchK/0=
git.sr.ht/~gabe/hod v0.6.10/go.mod h1:RWvhEC6WCTlPZnsOt74bqHIcM1qOHf9dCJ5OoIZpjM4=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 h1:PqzgE6kAMi81xWQA2QIVxjWkFHptGgC547vchpUbtFo=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=


@@ 32,6 36,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.4.16 h1:NholfewybRLOwACgfqfzn/N5xa6keKNs4fP00t0cwLo=
github.com/RoaringBitmap/roaring v0.4.16/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
github.com/RoaringBitmap/roaring v0.4.17 h1:oCYFIFEMSQZrLHpywH7919esI1VSrQZ0pJXkZPGIJ78=
github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI=
github.com/SAP/go-hdb v0.13.1/go.mod h1:etBT+FAi1t5k3K3tf5vQTnosgYmhDkRi8jEnQqCnxF0=
github.com/SermoDigital/jose v0.9.1/go.mod h1:ARgCUhI1MHQH+ONky/PAtmVHQrP5JlGY0F3poXOp/fA=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=


@@ 101,6 107,8 @@ github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102 h1:afESQBXJEnj3fu+
github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f h1:dDxpBYafY/GYpcl+LS4Bn3ziLPuEdGRkRjYAbSlWxSA=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/distribution v2.6.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v0.0.0-20180422163414-57142e89befe/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=


@@ 155,6 163,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pO
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20190218232222-2a8bb927dd31 h1:L7s4Kab5p6uNwbGGZl4w9VkSTmzHSLkP8w/xCbYEWOo=
github.com/golang/snappy v0.0.0-20190218232222-2a8bb927dd31/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf/go.mod h1:P32wAyui1PQ58Oce/KYkOqQv8cVw1zAapXOl+dRFGbc=
github.com/gonum/diff v0.0.0-20180125090814-f0137a19aa16/go.mod h1:22dM4PLscQl+Nzf64qNBurVJvfyvZELT0iRW2l/NN70=


@@ 196,6 206,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.7.0 h1:tPFY/SM+d656aSgLWO2Eckc3Exwpwwy
github.com/grpc-ecosystem/grpc-gateway v1.7.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.8.6 h1:XvND7+MPP7Jp+JpqSZ7naSl5nVZf6k0LbL1V3EKh0zc=
github.com/grpc-ecosystem/grpc-gateway v1.8.6/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/gtfierro/xboswave v0.0.0-20190331221909-bf5c52bb0c47 h1:u6vpMi1pPrU1mfScr46ri7lZsZq3eJagsCJkVlIs15k=
github.com/gtfierro/xboswave v0.0.0-20190331221909-bf5c52bb0c47/go.mod h1:UJWRZZCe4AT4/JfWsL0y4Oq07a1dX+v6JyQTL6cADBk=
github.com/gtfierro/xboswave v0.0.0-20190401050410-57c591a772ff h1:bOILVA4D18bIz0+4UNlB2iYZUJzHz1/9FAglTgJYfF0=


@@ 363,6 375,8 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1 h1:F++O52m40owAmADcojzM+9gyjmMOY/T4oYJkgFDH8RE=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/profile v1.3.0 h1:OQIvuDgm00gWVWGTf4m4mCt6W1/0YqU7Ntg0mySWgaI=
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=


@@ 437,6 451,8 @@ github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs=


@@ 444,6 460,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.1 h1:qgMbHoJbPbw579P+1zVY+6n4nIFuIchaIjzZ/I/Yq8M=
github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=


@@ 459,6 477,8 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stevvooe/resumable v0.0.0-20180830230917-22b14a53ba50/go.mod h1:1pdIZTAHUz+HDKDVZ++5xg/duPlhKAIzw9qy42CWYp4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=


@@ 522,6 542,7 @@ golang.org/x/exp v0.0.0-20190212162250-21964bba6549/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=


@@ 556,6 577,7 @@ golang.org/x/perf v0.0.0-20190124201629-844a5f5b46f4/go.mod h1:JLpeXjPJfIyPr5Tlb
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180903190138-2b024373dcd9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=


@@ 598,6 620,7 @@ golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190214204934-8dcb7bc8c7fe/go.mod h1:E6PF97AdD6v0s+fPshSmumCW1S1Ne85RbPQxELkKa44=
golang.org/x/tools v0.0.0-20190219035721-0a7d439b5fa8/go.mod h1:E6PF97AdD6v0s+fPshSmumCW1S1Ne85RbPQxELkKa44=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190315214010-f0bfdbff1f9c h1:KQ2sRfnx/Xk0E4v13yE9v3gCXAn6qieU1aiQOsbmpQg=
golang.org/x/tools v0.0.0-20190315214010-f0bfdbff1f9c/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=


@@ 625,6 648,8 @@ google.golang.org/genproto v0.0.0-20181219182458-5a97ab628bfb/go.mod h1:7Ep/1NZk
google.golang.org/genproto v0.0.0-20190201180003-4b09977fb922/go.mod h1:L3J43x8/uS+qIUoksaLKe6OS3nUKxOKuIFz1sl2/jx4=
google.golang.org/genproto v0.0.0-20190215211957-bd968387e4aa h1:FVL+/MjP2dzG4PxLpCJR7B6esIia88UAbsfYUrCc8U4=
google.golang.org/genproto v0.0.0-20190215211957-bd968387e4aa/go.mod h1:L3J43x8/uS+qIUoksaLKe6OS3nUKxOKuIFz1sl2/jx4=
google.golang.org/genproto v0.0.0-20190508193815-b515fa19cec8 h1:x913Lq/RebkvUmRSdQ8MNb0GZKn+SR1ESfoetcQSeak=
google.golang.org/genproto v0.0.0-20190508193815-b515fa19cec8/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=


@@ 634,6 659,8 @@ google.golang.org/grpc v1.18.0 h1:IZl7mfBGfbhYx2p2rKRtYgDFw6SBz+kclmxYrCksPPA=
google.golang.org/grpc v1.18.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/BTrDB/btrdb.v4 v4.0.0-20180821183339-596e21fb2a6f/go.mod h1:NFCvvX7MRnwlaC1Z+OMR8UuRCZ0JevTdCgU4u/1xxXQ=
gopkg.in/BTrDB/btrdb.v4 v4.15.3 h1:bTB+xKfPn5mwDpRh6LQoICalbnrvkDaZj5JRR8NAS3M=
gopkg.in/BTrDB/btrdb.v4 v4.15.3/go.mod h1:NFCvvX7MRnwlaC1Z+OMR8UuRCZ0JevTdCgU4u/1xxXQ=

M main.go => main.go +0 -2
@@ 110,8 110,6 @@ func main() {
		end = end.GetUpstream()
	}

	stages.Showtime(ts_stage.GetQueue())

	select {}
	cancel()
}

M stages/brick.go => stages/brick.go +40 -108
@@ 25,7 25,7 @@ func init() {
type BrickQueryStage struct {
	upstream Stage
	ctx      context.Context
	output   chan Context
	output   chan *Request

	db            *hod.HodDB
	highwatermark int64


@@ 45,7 45,7 @@ func NewBrickQueryStage(cfg *BrickQueryStageConfig) (*BrickQueryStage, error) {
	}
	stage := &BrickQueryStage{
		upstream: cfg.Upstream,
		output:   make(chan Context),
		output:   make(chan *Request),
		ctx:      cfg.StageContext,
	}



@@ 78,39 78,28 @@ func NewBrickQueryStage(cfg *BrickQueryStageConfig) (*BrickQueryStage, error) {
			input := stage.upstream.GetQueue()
			for {
				select {
				case ctx := <-input:
					// new API
					if len(ctx.request.Sites) > 0 && len(ctx.request.Views) > 0 {
						if err := stage.processQuery2(ctx); err != nil {
							log.Println(err)
							ctx.response = nil
							ctx.addError(err)
							stage.output <- ctx
				case req := <-input:
					if req.fetch_request != nil {
						// handle metadata stage of fetch request
						if len(req.fetch_request.Sites) > 0 && len(req.fetch_request.Views) > 0 {
							if err := stage.processQuery(req); err != nil {
								log.Println(err)
								req.addError(err)
							}
						}
						// old API
					} else if len(ctx.request.Sites) > 0 && len(ctx.request.Streams) > 0 {
						ctx.addError(errors.New("Need to upgrade to pymortar>=0.3.2"))
						log.Error("Old client")
						ctx.response = nil
						stage.output <- ctx
						//if err := stage.processQuery(ctx); err != nil {
						//	log.Println(err)
						//	ctx.response = nil
						//	ctx.addError(err)
						//	stage.output <- ctx
						//}
					} else if len(ctx.qualify_request.Required) > 0 {
						if err := stage.processQualify(ctx); err != nil {
							log.Warning(ctx.errors)
							ctx.qualify_done <- &mortarpb.QualifyResponse{
								Error: err.Error(),
						stage.output <- req
					} else if req.qualify_request != nil {
						// handle qualify request
						if len(req.qualify_request.Required) > 0 {
							if err := stage.processQualify(req); err != nil {
								req.addError(err)
								req.qualify_responses <- &mortarpb.QualifyResponse{
									Error: err.Error(),
								}
							}
							ctx.response = nil
							stage.output <- ctx
						}
					} else {
						stage.output <- ctx // if no sites/views, pass it along anyway?
					}

				case <-stage.ctx.Done():
					// case that breaks the stage and releases resources
					fmt.Println("Ending Brick Queue")


@@ 141,14 130,14 @@ func (stage *BrickQueryStage) SetUpstream(upstream Stage) {
}

// blocks on internal channel until next "Context" is ready
func (stage *BrickQueryStage) GetQueue() chan Context {
func (stage *BrickQueryStage) GetQueue() chan *Request {
	return stage.output
}
func (stage *BrickQueryStage) String() string {
	return "<| brick stage |>"
}

func (stage *BrickQueryStage) processQualify(ctx Context) error {
func (stage *BrickQueryStage) processQualify(req *Request) error {
	brickresp := &mortarpb.QualifyResponse{}

	sites := make(map[string]struct{})


@@ 158,16 147,16 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
		Filter:    logpb.TimeFilter_At,
		Timestamp: time.Now().UnixNano(),
	}
	version_response, err := stage.db.Versions(ctx.ctx, version_query)
	version_response, err := stage.db.Versions(req.ctx, version_query)
	if err != nil {
		ctx.addError(err)
		req.addError(err)
		log.Error(err)
		return err
	}
	if version_response.Error != "" {
		err = errors.New(version_response.Error)
		log.Error(err)
		ctx.addError(err)
		req.addError(err)
		return err
	}



@@ 175,20 164,20 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
		sites[row.Values[0].Value] = struct{}{}
	}

	for _, querystring := range ctx.qualify_request.Required {
	for _, querystring := range req.qualify_request.Required {
		query, err := stage.db.ParseQuery(querystring, 0)
		if err != nil {
			ctx.addError(err)
			req.addError(err)
			log.Error(err)
			return err
		}

		for site := range sites {
			query.Graphs = []string{site}
			res, err := stage.db.Select(ctx.ctx, query)
			res, err := stage.db.Select(req.ctx, query)
			if err != nil {
				log.Error(err)
				ctx.addError(err)
				req.addError(err)
				//return err
			} else if len(res.Rows) == 0 {
				delete(sites, site)


@@ 198,19 187,19 @@ func (stage *BrickQueryStage) processQualify(ctx Context) error {
	for site := range sites {
		brickresp.Sites = append(brickresp.Sites, site)
	}
	ctx.qualify_done <- brickresp
	req.qualify_responses <- brickresp

	return nil
}

// We need to rethink how the Brick stage handles the view + dataFrame processing

func (stage *BrickQueryStage) processQuery2(ctx Context) error {
func (stage *BrickQueryStage) processQuery(req *Request) error {
	// store view name -> list of dataVars
	var viewDataVars = make(map[string][]string)
	// store view name -> list of indexes to dependent dataFrames
	var viewDataFrames = make(map[string][]int)
	for idx, dataFrame := range ctx.request.DataFrames {
	for idx, dataFrame := range req.fetch_request.DataFrames {
		idx := idx
	tsLoop:
		for _, timeseries := range dataFrame.Timeseries {


@@ 230,10 219,10 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
	log.Info("DataVars: ", viewDataVars)
	log.Info("DataFrames: ", viewDataFrames)

	for _, view := range ctx.request.Views {
	for _, view := range req.fetch_request.Views {
		query, err := stage.db.ParseQuery(view.Definition, stage.highwatermark)
		if err != nil {
			ctx.addError(err)
			req.addError(err)
			return err
		}



@@ 242,13 231,12 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
		// property is how to relate the points to the timeseries database. However, it also introduces the complexity
		// of dealing with whether or not the variables *do* have associated timeseries or not.
		mapping, _ := rewriteQuery(viewDataVars[view.Name], query)
		for _, sitename := range ctx.request.Sites {
		for _, sitename := range req.fetch_request.Sites {
			query.Graphs = []string{sitename}
			log.Warning("rewrote: ", query)
			res, err := stage.db.Select(ctx.ctx, query)
			res, err := stage.db.Select(req.ctx, query)
			if err != nil {
				log.Error(err)
				ctx.addError(err)
				req.addError(err)
				continue
				//return err
			}


@@ 268,13 256,11 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
			brickresp.View = view.Name
			brickresp.Variables = res.Variables

			log.Info("Got rows: ", len(res.Rows))

			for _, row := range res.Rows {
				//	// for each dependent dataFrame
				for _, selIdx := range viewDataFrames[view.Name] {
					// for each timeseries
					dataFrame := ctx.request.DataFrames[selIdx]
					dataFrame := req.fetch_request.DataFrames[selIdx]
					// add uuids to the list on the DataFrame for each datavar from the view we're currently working with
					for _, ts := range dataFrame.Timeseries {
						if ts.View == view.Name {


@@ 285,7 271,7 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
						}
					}
					// TODO: do we need to update the dataFrame?
					ctx.request.DataFrames[selIdx] = dataFrame
					req.fetch_request.DataFrames[selIdx] = dataFrame
				}
				//}
				// we also add the query results to the output


@@ 293,64 279,10 @@ func (stage *BrickQueryStage) processQuery2(ctx Context) error {
			}

			// send the query results to the client
			ctx.done <- brickresp
		}

	}
	// signal that we are done processing this stage (1x)
	stage.output <- ctx
	return nil
}

func (stage *BrickQueryStage) processQuery(ctx Context) error {
	for idx, reqstream := range ctx.request.Streams {
		query, err := stage.db.ParseQuery(reqstream.Definition, stage.highwatermark)
		if err != nil {
			ctx.addError(err)
			return err
		}

		// this rewrites the incoming query so that it extracts the UUIDs (bf:uuid property) for each of the
		// variables in the SELECT clause of the query. This removes the need for the user to know that the bf:uuid
		// property is how to relate the points to the timeseries database. However, it also introduces the complexity
		// of dealing with whether or not the variables *do* have associated timeseries or not.
		_, startIdx := rewriteQuery(reqstream.DataVars, query)
		for _, sitename := range ctx.request.Sites {
			query.Graphs = []string{sitename}
			res, err := stage.db.Select(ctx.ctx, query)
			if err != nil {
				ctx.addError(err)
				//return err
			}

			// TODO: if we have no results from anywhere, need to notify the user and terminate early

			// collate the UUIDs from query results and push into context.
			// Because the rewritten query puts all of the new variables corresponding to the possible UUIDs at the end,
			// the rewriteQuery method has to return the index that we start with when iterating through the variables in
			// each row to make sure we get the actual queries.
			stream := ctx.request.Streams[idx]

			brickresp := &mortarpb.FetchResponse{}

			brickresp.Variable = reqstream.Name
			brickresp.Variables = res.Variables
			brickresp.Site = sitename
			for _, row := range res.Rows {
				for uuidx := startIdx; uuidx < len(query.Vars); uuidx++ {
					stream.Uuids = append(stream.Uuids, row.Values[uuidx].Value)
				}
				// we also add the query results to the output
				brickresp.Rows = append(brickresp.Rows, transformRow(row))
			}
			// send the query results to the client
			// TODO: make this streaming?
			ctx.done <- brickresp
			req.fetch_responses <- brickresp
		}

	}
	// signal that we are done processing this stage (1x)
	stage.output <- ctx
	return nil
}


M stages/frontend.go => stages/frontend.go +20 -31
@@ 25,7 25,7 @@ var (

type ApiFrontendBasicStage struct {
	ctx    context.Context
	output chan Context
	output chan *Request
	auth   *CognitoAuth
	sem    chan struct{}
	sync.Mutex


@@ 42,7 42,7 @@ type ApiFrontendBasicStageConfig struct {

func NewApiFrontendBasicStage(cfg *ApiFrontendBasicStageConfig) (*ApiFrontendBasicStage, error) {
	stage := &ApiFrontendBasicStage{
		output: make(chan Context),
		output: make(chan *Request),
		ctx:    cfg.StageContext,
		sem:    make(chan struct{}, 20),
	}


@@ 106,7 106,7 @@ func (stage *ApiFrontendBasicStage) SetUpstream(upstream Stage) {
	//has no upstream
}

func (stage *ApiFrontendBasicStage) GetQueue() chan Context {
func (stage *ApiFrontendBasicStage) GetQueue() chan *Request {
	return stage.output
}
func (stage *ApiFrontendBasicStage) String() string {


@@ 146,29 146,22 @@ func (stage *ApiFrontendBasicStage) Qualify(ctx context.Context, request *mortar

	qualifyQueriesProcessed.Inc()

	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	defer cancel()

	// prepare context for the execution
	responseChan := make(chan *mortarpb.QualifyResponse)
	queryCtx := Context{
		ctx:             ctx,
		qualify_request: *request,
		qualify_done:    responseChan,
	}
	req := NewQualifyRequest(ctx, request)

	// send the request to the output of this stage so it
	// can be handled by the next stage
	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return nil, errors.Wrap(ctx.Err(), "qualify timeout on dispatching query")
	}

	select {
	case resp := <-responseChan:
		//close(responseChan)
	case resp := <-req.qualify_responses:
		if resp.Error != "" {
			log.Warning(resp.Error)
		}
		close(req.qualify_responses)
		return resp, nil
	case <-ctx.Done():
		return nil, errors.Wrap(ctx.Err(), "qualify timeout on getting query response")


@@ 191,10 184,9 @@ func (stage *ApiFrontendBasicStage) Fetch(request *mortarpb.FetchRequest, client
	activeQueries.Inc()
	defer activeQueries.Dec()

	ctx, cancel := context.WithTimeout(client.Context(), requestTimeout)
	defer cancel()
	ctx := client.Context()

	headers, ok := metadata.FromIncomingContext(client.Context())
	headers, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return unauthorizedErr
	}


@@ 223,12 215,8 @@ func (stage *ApiFrontendBasicStage) Fetch(request *mortarpb.FetchRequest, client
		return errors.Wrap(ctx.Err(), "fetch timeout on getting semaphore")
	}

	responseChan := make(chan *mortarpb.FetchResponse)
	queryCtx := Context{
		ctx:     ctx,
		request: *request,
		done:    responseChan,
	}
	req := NewFetchRequest(ctx, request)

	ret := make(chan error)
	go func() {
		var err error


@@ 236,16 224,14 @@ func (stage *ApiFrontendBasicStage) Fetch(request *mortarpb.FetchRequest, client
	sendloop:
		for {
			select {
			case resp := <-responseChan:
			case resp := <-req.fetch_responses:
				if resp == nil {
					// if this is nil then we are done, but there's no error (yet)
					break sendloop
				} else if err = client.Send(resp); err != nil {
					// we have an error on sending, so we tear it all down
					log.Error(errors.Wrap(err, "Error on sending"))
					// have to remember to call cancel() here
					finishResponse(resp)
					cancel()
					break sendloop
				} else {
					// happy path


@@ 254,22 240,25 @@ func (stage *ApiFrontendBasicStage) Fetch(request *mortarpb.FetchRequest, client
				}
			case <-ctx.Done():
				// this branch gets triggered because context gets cancelled
				err = errors.Wrapf(ctx.Err(), "fetch timeout on response %v", queryCtx.errors)
				err = errors.Wrapf(ctx.Err(), "fetch timeout on response %v", req.err)
				break sendloop
			}
		}
		ret <- err
		close(req.fetch_responses)
	}()

	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return errors.New("timeout")
	}

	select {
	case e := <-ret:
		log.Error("Got Error in ret ", e)
		if e != nil {
			log.Error("Got Error in ret ", e)
		}
		return e
	case <-ctx.Done():
		log.Error("timing out on waiting for result in fetch")

M stages/grpc_wave_frontend.go => stages/grpc_wave_frontend.go +10 -22
@@ 17,7 17,7 @@ import (

type ApiFrontendWAVEAuthStage struct {
	ctx    context.Context
	output chan Context
	output chan *Request
	sem    chan struct{}
	sync.Mutex
}


@@ 35,7 35,7 @@ type ApiFrontendWAVEAuthStageConfig struct {
func NewApiFrontendWAVEAuthStage(cfg *ApiFrontendWAVEAuthStageConfig) (*ApiFrontendWAVEAuthStage, error) {

	stage := &ApiFrontendWAVEAuthStage{
		output: make(chan Context),
		output: make(chan *Request),
		ctx:    cfg.StageContext,
		sem:    make(chan struct{}, 20),
	}


@@ 96,7 96,7 @@ func (stage *ApiFrontendWAVEAuthStage) SetUpstream(upstream Stage) {
	//has no upstream
}

func (stage *ApiFrontendWAVEAuthStage) GetQueue() chan Context {
func (stage *ApiFrontendWAVEAuthStage) GetQueue() chan *Request {
	return stage.output
}
func (stage *ApiFrontendWAVEAuthStage) String() string {


@@ 126,22 126,16 @@ func (stage *ApiFrontendWAVEAuthStage) Qualify(ctx context.Context, request *mor
	defer cancel()

	// prepare context for the execution
	responseChan := make(chan *mortarpb.QualifyResponse)
	queryCtx := Context{
		ctx:             ctx,
		qualify_request: *request,
		qualify_done:    responseChan,
	}
	req := NewQualifyRequest(ctx, request)

	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return nil, errors.Wrap(ctx.Err(), "qualify timeout on dispatching query")
	}

	select {
	case resp := <-responseChan:
		//close(responseChan)
	case resp := <-req.qualify_responses:
		if resp.Error != "" {
			log.Warning(resp.Error)
		}


@@ 183,12 177,7 @@ func (stage *ApiFrontendWAVEAuthStage) Fetch(request *mortarpb.FetchRequest, cli
		return errors.Wrap(ctx.Err(), "fetch timeout on getting semaphore")
	}

	responseChan := make(chan *mortarpb.FetchResponse)
	queryCtx := Context{
		ctx:     ctx,
		request: *request,
		done:    responseChan,
	}
	req := NewFetchRequest(ctx, request)
	ret := make(chan error)
	go func() {
		var err error


@@ 196,7 185,7 @@ func (stage *ApiFrontendWAVEAuthStage) Fetch(request *mortarpb.FetchRequest, cli
	sendloop:
		for {
			select {
			case resp := <-responseChan:
			case resp := <-req.fetch_responses:
				if resp == nil {
					// if this is nil then we are done, but there's no error (yet)
					break sendloop


@@ 205,7 194,6 @@ func (stage *ApiFrontendWAVEAuthStage) Fetch(request *mortarpb.FetchRequest, cli
					log.Error(errors.Wrap(err, "Error on sending"))
					// have to remember to call cancel() here
					finishResponse(resp)
					cancel()
					break sendloop
				} else {
					// happy path


@@ 214,7 202,7 @@ func (stage *ApiFrontendWAVEAuthStage) Fetch(request *mortarpb.FetchRequest, cli
				}
			case <-ctx.Done():
				// this branch gets triggered because context gets cancelled
				err = errors.Wrapf(ctx.Err(), "fetch timeout on response %v", queryCtx.errors)
				err = errors.Wrapf(ctx.Err(), "fetch timeout on response %v", req.err)
				break sendloop
			}
		}


@@ 222,7 210,7 @@ func (stage *ApiFrontendWAVEAuthStage) Fetch(request *mortarpb.FetchRequest, cli
	}()

	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return errors.New("timeout")
	}

M stages/influxdb.go => stages/influxdb.go +20 -22
@@ 17,7 17,7 @@ import (
type InfluxDBTimeseriesQueryStage struct {
	upstream Stage
	ctx      context.Context
	output   chan Context
	output   chan *Request

	conn influx.Client
	// timeseries database stuff


@@ 49,7 49,7 @@ func NewInfluxDBTimeseriesQueryStage(cfg *InfluxDBTimeseriesStageConfig) (*Influ

	stage := &InfluxDBTimeseriesQueryStage{
		upstream: cfg.Upstream,
		output:   make(chan Context),
		output:   make(chan *Request),
		ctx:      cfg.StageContext,
		conn:     conn,
	}


@@ 62,15 62,13 @@ func NewInfluxDBTimeseriesQueryStage(cfg *InfluxDBTimeseriesStageConfig) (*Influ
			input := stage.upstream.GetQueue()
			for {
				select {
				case ctx := <-input:
					if len(ctx.request.Sites) > 0 && len(ctx.request.DataFrames) > 0 {
						if err := stage.processQuery(ctx); err != nil {
				case req := <-input:
					if len(req.fetch_request.Sites) > 0 && len(req.fetch_request.DataFrames) > 0 {
						if err := stage.processQuery(req); err != nil {
							log.Println(err)
						}
					}
					ctx.response = nil
					stage.output <- ctx
					//ctx.done <- nil
					stage.output <- req
				case <-stage.ctx.Done():
					// case that breaks the stage and releases resources
					fmt.Println("Ending Timeseries Queue")


@@ 99,7 97,7 @@ func (stage *InfluxDBTimeseriesQueryStage) SetUpstream(upstream Stage) {
	fmt.Println("Updated stage to ", upstream)
}

func (stage *InfluxDBTimeseriesQueryStage) GetQueue() chan Context {
func (stage *InfluxDBTimeseriesQueryStage) GetQueue() chan *Request {
	return stage.output
}



@@ 107,18 105,18 @@ func (stage *InfluxDBTimeseriesQueryStage) String() string {
	return "<|influx ts stage|>"
}

func (stage *InfluxDBTimeseriesQueryStage) processQuery(ctx Context) error {
func (stage *InfluxDBTimeseriesQueryStage) processQuery(req *Request) error {
	// parse timestamps for the query
	start_time, err := time.Parse(time.RFC3339, ctx.request.Time.Start)
	start_time, err := time.Parse(time.RFC3339, req.fetch_request.Time.Start)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse Start time (%s)", ctx.request.Time.Start)
		ctx.addError(err)
		err = errors.Wrapf(err, "Could not parse Start time (%s)", req.fetch_request.Time.Start)
		req.addError(err)
		return err
	}
	end_time, err := time.Parse(time.RFC3339, ctx.request.Time.End)
	end_time, err := time.Parse(time.RFC3339, req.fetch_request.Time.End)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse End time (%s)", ctx.request.Time.End)
		ctx.addError(err)
		err = errors.Wrapf(err, "Could not parse End time (%s)", req.fetch_request.Time.End)
		req.addError(err)
		return err
	}



@@ 126,7 124,7 @@ func (stage *InfluxDBTimeseriesQueryStage) processQuery(ctx Context) error {
	// we put each collection from a plugin in its own "measurement". But, we want to query
	// by UUID, and we don't know the measurement. Either need to build some index of UUID
	// to measurement, or just insert into a single measurement that we know from the beginning
	for _, dataFrame := range ctx.request.DataFrames {
	for _, dataFrame := range req.fetch_request.DataFrames {
		for _, uuStr := range dataFrame.Uuids {

			// default for RAW


@@ 135,7 133,7 @@ func (stage *InfluxDBTimeseriesQueryStage) processQuery(ctx Context) error {
			if dataFrame.Aggregation != mortarpb.AggFunc_AGG_FUNC_RAW {
				window, err := ParseDuration(dataFrame.Window)
				if err != nil {
					ctx.addError(err)
					req.addError(err)
					return err
				}
				groupby = fmt.Sprintf("GROUP BY time(%s)", window)


@@ 208,8 206,8 @@ func (stage *InfluxDBTimeseriesQueryStage) processQuery(ctx Context) error {
					if pcount == TS_BATCH_SIZE {
						tsresp.DataFrame = dataFrame.Name
						tsresp.Identifier = uuStr
						ctx.response = tsresp
						stage.output <- ctx
						req.fetch_responses <- tsresp
						//stage.output <- ctx
						tsresp = &mortarpb.FetchResponse{}
						pcount = 0
					}


@@ 219,8 217,8 @@ func (stage *InfluxDBTimeseriesQueryStage) processQuery(ctx Context) error {
			if len(tsresp.Times) > 0 {
				tsresp.DataFrame = dataFrame.Name
				tsresp.Identifier = uuStr
				ctx.response = tsresp
				stage.output <- ctx
				req.fetch_responses <- tsresp
				//stage.output <- ctx
			}

		}

M stages/loadgen.go => stages/loadgen.go +6 -6
@@ 7,17 7,17 @@ import (
)

type SimpleLoadGenStage struct {
	output chan Context
	output chan *Request
	count  int64
}

func NewSimpleLoadGenStage(contexts ...func() Context) *SimpleLoadGenStage {
func NewSimpleLoadGenStage(contexts ...func() *Request) *SimpleLoadGenStage {
	stage := &SimpleLoadGenStage{
		output: make(chan Context),
		output: make(chan *Request),
	}

	if len(contexts) == 0 {
		contexts = append(contexts, func() Context { return Context{} })
		contexts = append(contexts, func() *Request { return &Request{} })
	}

	var notifier sync.Once


@@ 52,8 52,8 @@ func (stage *SimpleLoadGenStage) GetUpstream() Stage {
func (stage *SimpleLoadGenStage) SetUpstream(upstream Stage) {
}

// blocks on internal channel until next "Context" is ready
func (stage *SimpleLoadGenStage) GetQueue() chan Context {
// blocks on internal channel until next "*Request" is ready
func (stage *SimpleLoadGenStage) GetQueue() chan *Request {
	return stage.output
}


A stages/request.go => stages/request.go +67 -0
@@ 0,0 1,67 @@
package stages

import (
	"context"
	mortarpb "git.sr.ht/~gabe/mortar/proto"
)

type Request struct {
	ctx    context.Context
	cancel func()
	err    error

	qualify_request *mortarpb.QualifyRequest
	fetch_request   *mortarpb.FetchRequest

	fetch_responses   chan *mortarpb.FetchResponse
	qualify_responses chan *mortarpb.QualifyResponse
}

func NewQualifyRequest(ctx context.Context, qualify *mortarpb.QualifyRequest) *Request {
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	//defer cancel()

	req := &Request{
		ctx:               ctx,
		cancel:            cancel,
		qualify_request:   qualify,
		qualify_responses: make(chan *mortarpb.QualifyResponse),
	}

	return req
}

func NewFetchRequest(ctx context.Context, fetch *mortarpb.FetchRequest) *Request {
	ctx, cancel := context.WithTimeout(ctx, requestTimeout)
	//defer cancel()

	req := &Request{
		ctx:             ctx,
		cancel:          cancel,
		fetch_request:   fetch,
		fetch_responses: make(chan *mortarpb.FetchResponse),
	}

	return req
}

func (request *Request) addError(err error) {
	if request.err == nil {
		request.err = err
	}
}

func (request *Request) Done() <-chan struct{} {
	return request.ctx.Done()
}

//func (request *Request) handle() {
//	go func() {
//		if request.qualify_responses != nil {
//			for outmsg := range request.qualify_responses {
//				log.Println(outmsg)
//			}
//		}
//		//for out := range queue
//	}()
//}

M stages/stage.go => stages/stage.go +2 -67
@@ 1,10 1,7 @@
package stages

import (
	"context"
	mortarpb "git.sr.ht/~gabe/mortar/proto"
	"github.com/pkg/errors"
	"sync"
	"time"
)



@@ 12,74 9,12 @@ var MAX_TIMEOUT = time.Second * 300
var TS_BATCH_SIZE = 500
var errStreamNotExist = errors.New("Stream does not exist")

type Context struct {
	ctx             context.Context
	qualify_request mortarpb.QualifyRequest
	request         mortarpb.FetchRequest
	response        *mortarpb.FetchResponse
	done            chan *mortarpb.FetchResponse
	qualify_done    chan *mortarpb.QualifyResponse
	errors          []error
	finished        bool
	sync.Mutex
}

func (ctx *Context) isDone() bool {
	select {
	case <-ctx.ctx.Done():
		return true
	default:
		return false
	}
}

func (ctx *Context) addError(err error) {
	ctx.Lock()
	defer ctx.Unlock()
	log.Error("Context Error", err)
	ctx.errors = append(ctx.errors, err)
}

func (ctx *Context) finish() {
	ctx.Lock()
	defer ctx.Unlock()
	ctx.finished = true
	ctx.done <- nil
}

func (ctx *Context) is_finished() bool {
	ctx.Lock()
	defer ctx.Unlock()
	return ctx.finished
}

type Stage interface {
	// get the stage we pull from
	GetUpstream() Stage
	// set the stage we pull from
	SetUpstream(upstream Stage)
	// blocks on internal channel until next "Context" is ready
	GetQueue() chan Context
	// blocks on internal channel until next Request is ready
	GetQueue() chan *Request
	String() string
}

func Showtime(queue chan Context) {
	go func() {
		log.Println("get output")
		for out := range queue {
			if out.isDone() {
				continue
			}
			if out.response == nil {
				if out.done != nil {
					close(out.done)
				}
				if out.qualify_done != nil {
					close(out.qualify_done)
				}
			} else {
				out.done <- out.response
			}
		}
	}()
}

M stages/timeseries.go => stages/timeseries.go +41 -47
@@ 17,7 17,7 @@ import (
type TimeseriesQueryStage struct {
	upstream Stage
	ctx      context.Context
	output   chan Context
	output   chan *Request

	// timeseries database stuff
	conn        *btrdb.BTrDB


@@ 38,7 38,7 @@ func NewTimeseriesQueryStage(cfg *TimeseriesStageConfig) (*TimeseriesQueryStage,
	}
	stage := &TimeseriesQueryStage{
		upstream: cfg.Upstream,
		output:   make(chan Context),
		output:   make(chan *Request),
		ctx:      cfg.StageContext,
	}



@@ 56,25 56,13 @@ func NewTimeseriesQueryStage(cfg *TimeseriesStageConfig) (*TimeseriesQueryStage,
			input := stage.upstream.GetQueue()
			for {
				select {
				case ctx := <-input:

					if ctx.isDone() {
						ctx.response = nil
						stage.output <- ctx
						continue
					}

					if len(ctx.request.Sites) > 0 && len(ctx.request.DataFrames) > 0 {
						if err := stage.processQuery(ctx); err != nil {
				case req := <-input:
					if len(req.fetch_request.Sites) > 0 && len(req.fetch_request.DataFrames) > 0 {
						if err := stage.processQuery(req); err != nil {
							log.Println(err)
						}
					} else if len(ctx.request.Sites) > 0 && len(ctx.request.Streams) > 0 {
						ctx.addError(errors.New("Need to upgrade to pymortar>=0.3.2"))
						log.Error("Old client")
					}
					ctx.response = nil
					stage.output <- ctx
					//ctx.done <- nil
					//stage.output <- req
				case <-stage.ctx.Done():
					// case that breaks the stage and releases resources
					fmt.Println("Ending Timeseries Queue")


@@ 102,7 90,7 @@ func (stage *TimeseriesQueryStage) SetUpstream(upstream Stage) {
	fmt.Println("Updated stage to ", upstream)
}

func (stage *TimeseriesQueryStage) GetQueue() chan Context {
func (stage *TimeseriesQueryStage) GetQueue() chan *Request {
	return stage.output
}



@@ 128,7 116,6 @@ func (stage *TimeseriesQueryStage) getStream(ctx context.Context, streamuuid uui
			e := btrdb.ToCodedError(existsErr)
			if e.Code != 501 {
				err = errors.Wrap(existsErr, "Could not fetch stream")
				log.Fatal("c")
				//defer cancel()
				return
			}


@@ 161,19 148,19 @@ func (stage *TimeseriesQueryStage) getStream(ctx context.Context, streamuuid uui
	return
}

func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
func (stage *TimeseriesQueryStage) processQuery(req *Request) error {
	//	defer ctx.finish()
	// parse timestamps for the query
	start_time, err := time.Parse(time.RFC3339, ctx.request.Time.Start)
	start_time, err := time.Parse(time.RFC3339, req.fetch_request.Time.Start)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse Start time (%s)", ctx.request.Time.Start)
		ctx.addError(err)
		err = errors.Wrapf(err, "Could not parse Start time (%s)", req.fetch_request.Time.Start)
		req.addError(err)
		return err
	}
	end_time, err := time.Parse(time.RFC3339, ctx.request.Time.End)
	end_time, err := time.Parse(time.RFC3339, req.fetch_request.Time.End)
	if err != nil {
		err = errors.Wrapf(err, "Could not parse End time (%s)", ctx.request.Time.End)
		ctx.addError(err)
		err = errors.Wrapf(err, "Could not parse End time (%s)", req.fetch_request.Time.End)
		req.addError(err)
		return err
	}



@@ 183,23 170,23 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
	//qctx, cancel := context.WithTimeout(ctx.ctx, MAX_TIMEOUT)

	// loop over all streams, and then over all UUIDs
	for _, dataFrame := range ctx.request.DataFrames {
	for _, dataFrame := range req.fetch_request.DataFrames {
		for _, uuStr := range dataFrame.Uuids {
			uu := uuid.Parse(uuStr)
			if uu == nil {
				log.Warningf("Could not parse uuid %s", uuStr)
				continue
			}
			stream, err := stage.getStream(ctx.ctx, uu)
			stream, err := stage.getStream(req.ctx, uu)
			if err != nil {
				ctx.addError(err)
				req.addError(err)
				return err
			}

			// handle RAW streams
			if dataFrame.Aggregation == mortarpb.AggFunc_AGG_FUNC_RAW {
				// if raw data...
				rawpoints, generations, errchan := stream.RawValues(ctx.ctx, start_time.UnixNano(), end_time.UnixNano(), 0)
				rawpoints, generations, errchan := stream.RawValues(req.ctx, start_time.UnixNano(), end_time.UnixNano(), 0)
				resp := &mortarpb.FetchResponse{}
				var pcount = 0
				for p := range rawpoints {


@@ 214,9 201,10 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
					if pcount == TS_BATCH_SIZE {
						resp.DataFrame = dataFrame.Name
						resp.Identifier = uuStr
						if !ctx.isDone() {
							ctx.response = resp
							stage.output <- ctx
						select {
						case req.fetch_responses <- resp:
						case <-req.Done():
							continue
						}
						resp = &mortarpb.FetchResponse{}
						pcount = 0


@@ 225,28 213,28 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
				if len(resp.Times) > 0 {
					resp.DataFrame = dataFrame.Name
					resp.Identifier = uuStr
					if !ctx.isDone() {
						ctx.response = resp
						stage.output <- ctx
					select {
					case req.fetch_responses <- resp:
					case <-req.Done():
					}
				}

				<-generations
				if err := <-errchan; err != nil {
					ctx.addError(err)
					req.addError(err)
					log.Error(errors.Wrap(err, "got error in stream rawvalues"))
					return err
				}
			} else {
				windowSize, err := ParseDuration(dataFrame.Window)
				if err != nil {
					ctx.addError(err)
					req.addError(err)
					return err
				}
				windowDepth := math.Log2(float64(windowSize))
				suggestedAccuracy := uint8(math.Max(windowDepth-5, 30))

				statpoints, generations, errchan := stream.Windows(ctx.ctx, start_time.UnixNano(), end_time.UnixNano(), uint64(windowSize.Nanoseconds()), suggestedAccuracy, 0)
				statpoints, generations, errchan := stream.Windows(req.ctx, start_time.UnixNano(), end_time.UnixNano(), uint64(windowSize.Nanoseconds()), suggestedAccuracy, 0)

				resp := &mortarpb.FetchResponse{}
				var pcount = 0


@@ 258,10 246,14 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
					if pcount == TS_BATCH_SIZE {
						resp.DataFrame = dataFrame.Name
						resp.Identifier = uuStr
						if !ctx.isDone() {
							ctx.response = resp
							stage.output <- ctx
						//if !ctx.isDone() {
						select {
						case req.fetch_responses <- resp:
						case <-req.Done():
							continue
						}
						//stage.output <- ctx
						//}
						resp = &mortarpb.FetchResponse{}
						pcount = 0
					}


@@ 269,15 261,16 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {
				if len(resp.Times) > 0 {
					resp.DataFrame = dataFrame.Name
					resp.Identifier = uuStr
					if !ctx.isDone() {
						ctx.response = resp
						stage.output <- ctx
					select {
					case req.fetch_responses <- resp:
					case <-req.Done():
						continue
					}
				}

				<-generations
				if err := <-errchan; err != nil {
					ctx.addError(err)
					req.addError(err)
					return err
				}



@@ 285,6 278,7 @@ func (stage *TimeseriesQueryStage) processQuery(ctx Context) error {

		}
	}
	req.fetch_responses <- nil

	return nil
}

M stages/wavemq.go => stages/wavemq.go +9 -19
@@ 25,7 25,7 @@ type WAVEMQFrontendStageConfig struct {

type WAVEMQFrontendStage struct {
	client         mqpb.WAVEMQClient
	output         chan Context
	output         chan *Request
	perspective    *mqpb.Perspective
	namespaceBytes []byte
	sem            chan struct{}


@@ 59,7 59,7 @@ func NewWAVEMQFrontendStage(cfg *WAVEMQFrontendStageConfig) (*WAVEMQFrontendStag
	}

	stage := &WAVEMQFrontendStage{
		output:         make(chan Context),
		output:         make(chan *Request),
		client:         mqpb.NewWAVEMQClient(conn),
		perspective:    perspective,
		namespaceBytes: namespaceBytes,


@@ 117,7 117,7 @@ func (stage *WAVEMQFrontendStage) SetUpstream(upstream Stage) {
	//has no upstream
}

func (stage *WAVEMQFrontendStage) GetQueue() chan Context {
func (stage *WAVEMQFrontendStage) GetQueue() chan *Request {
	return stage.output
}
func (stage *WAVEMQFrontendStage) String() string {


@@ 145,21 145,16 @@ func (stage *WAVEMQFrontendStage) Qualify(ctx context.Context, request *mortarpb
	defer cancel()

	// prepare context for the execution
	responseChan := make(chan *mortarpb.QualifyResponse)
	queryCtx := Context{
		ctx:             ctx,
		qualify_request: *request,
		qualify_done:    responseChan,
	}
	req := NewQualifyRequest(ctx, request)

	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return nil, errors.Wrap(ctx.Err(), "qualify timeout on dispatching query")
	}

	select {
	case resp := <-responseChan:
	case resp := <-req.qualify_responses:
		//close(responseChan)
		if resp.Error != "" {
			log.Warning(resp.Error)


@@ 203,12 198,7 @@ func (stage *WAVEMQFrontendStage) Fetch(request *mortarpb.FetchRequest, client m
		return errors.Wrap(ctx.Err(), "fetch timeout on getting semaphore")
	}

	responseChan := make(chan *mortarpb.FetchResponse)
	queryCtx := Context{
		ctx:     ctx,
		request: *request,
		done:    responseChan,
	}
	req := NewFetchRequest(ctx, request)
	ret := make(chan error)
	go func() {
		var err error


@@ 216,7 206,7 @@ func (stage *WAVEMQFrontendStage) Fetch(request *mortarpb.FetchRequest, client m
	sendloop:
		for {
			select {
			case resp := <-responseChan:
			case resp := <-req.fetch_responses:
				if resp == nil {
					// if this is nil then we are done, but there's no error (yet)
					break sendloop


@@ 241,7 231,7 @@ func (stage *WAVEMQFrontendStage) Fetch(request *mortarpb.FetchRequest, client m
	}()

	select {
	case stage.output <- queryCtx:
	case stage.output <- req:
	case <-ctx.Done():
		return errors.New("timeout")
	}