~ashkeel/kilovolt-client-go

edc8d462d42b80cde52e53eb398ebeb029bf8df8 — Ash Keel 3 years ago 8c6f8f0
Add support for subscription to prefixes
Add tests
5 files changed, 308 insertions(+), 34 deletions(-)

M README.md
M client.go
A client_test.go
M go.mod
M go.sum
M README.md => README.md +7 -4
@@ 1,9 1,12 @@
# Kilovolt client

Go client for Kilovolt websocket servers, supports Kilovolt Protocol v3
Go client for Kilovolt websocket servers, supports Kilovolt Protocol v3/v4

## Getting started

```
go get github.com/strimertul/kilovolt-client-go
```
Depending on what version of kilovolt you need to interface with, you'll need to pick a corresponding version of kilovolt-client-go:

| Protocol | Go URL                                               |
| -------- | ---------------------------------------------------- |
| `v3`     | `go get github.com/strimertul/kilovolt-client-go`    |
| `v4`     | `go get github.com/strimertul/kilovolt-client-go/v2` |

M client.go => client.go +106 -27
@@ 14,7 14,7 @@ import (
	cmap "github.com/orcaman/concurrent-map"
	"github.com/sirupsen/logrus"

	kv "github.com/strimertul/kilovolt/v3"
	kv "github.com/strimertul/kilovolt/v4"
)

var (


@@ 23,15 23,21 @@ var (
	ErrEmptyKey             = errors.New("key empty or unset")
)

type KeyValuePair struct {
	Key   string
	Value string
}

type Client struct {
	Endpoint string
	Logger   logrus.FieldLogger

	headers       http.Header
	ws            *websocket.Conn
	mu            sync.Mutex         // Used to avoid concurrent writes to socket
	requests      cmap.ConcurrentMap // map[string]chan<- string
	subscriptions cmap.ConcurrentMap // map[string][]chan<- string
	headers    http.Header
	ws         *websocket.Conn
	mu         sync.Mutex         // Used to avoid concurrent writes to socket
	requests   cmap.ConcurrentMap // map[string]chan<- string
	keysubs    cmap.ConcurrentMap // map[string][]chan<- KeyValuePair
	prefixsubs cmap.ConcurrentMap // map[string][]chan<- KeyValuePair
}

type ClientOptions struct {


@@ 45,13 51,14 @@ func NewClient(endpoint string, options ClientOptions) (*Client, error) {
	}

	client := &Client{
		Endpoint:      endpoint,
		Logger:        options.Logger,
		headers:       options.Headers,
		ws:            nil,
		mu:            sync.Mutex{},
		requests:      cmap.New(), // make(map[string]chan<- string),
		subscriptions: cmap.New(), // make(map[string][]chan<- string),
		Endpoint:   endpoint,
		Logger:     options.Logger,
		headers:    options.Headers,
		ws:         nil,
		mu:         sync.Mutex{},
		requests:   cmap.New(), // make(map[string]chan<- string),
		keysubs:    cmap.New(), // make(map[string][]chan<- string),
		prefixsubs: cmap.New(), // make(map[string][]chan<- string),
	}

	err := client.ConnectToWebsocket()


@@ 122,10 129,18 @@ func (s *Client) ConnectToWebsocket() error {
							s.Logger.WithError(err).Error("websocket deserialize error")
							continue
						}
						// Deliver to subscriptions
						if subs, ok := s.subscriptions.Get(push.Key); ok {
							for _, chann := range subs.([]chan string) {
								chann <- push.NewValue
						// Deliver to key subscriptions
						if subs, ok := s.keysubs.Get(push.Key); ok {
							for _, chann := range subs.([]chan KeyValuePair) {
								chann <- KeyValuePair{push.Key, push.NewValue}
							}
						}
						// Deliver to prefix subscritpions
						for kv := range s.prefixsubs.IterBuffered() {
							if strings.HasPrefix(push.Key, kv.Key) {
								for _, chann := range kv.Val.([]chan KeyValuePair) {
									chann <- KeyValuePair{push.Key, push.NewValue}
								}
							}
						}



@@ 188,7 203,7 @@ func (s *Client) SetJSON(key string, data interface{}) error {
	}

	_, err = s.makeRequest(kv.Request{
		CmdName: kv.CmdReadKey,
		CmdName: kv.CmdWriteKey,
		Data: map[string]interface{}{
			"key":  key,
			"data": serialized,


@@ 198,14 213,17 @@ func (s *Client) SetJSON(key string, data interface{}) error {
	return err
}

func (s *Client) Subscribe(key string) (chan string, error) {
	chn := make(chan string)
func (s *Client) SubscribeKey(key string) (chan KeyValuePair, error) {
	chn := make(chan KeyValuePair, 10)

	data, ok := s.subscriptions.Get(key)
	subs := data.([]chan string)
	var subs []chan KeyValuePair
	data, ok := s.keysubs.Get(key)
	if ok {
		subs = data.([]chan KeyValuePair)
	}

	needsAPISubscription := !ok || len(subs) < 1
	s.subscriptions.Set(key, append(subs, chn))
	s.keysubs.Set(key, append(subs, chn))

	var err error
	// If this is the first time we subscribe to this key, ask server to push updates


@@ 221,17 239,18 @@ func (s *Client) Subscribe(key string) (chan string, error) {
	return chn, err
}

func (s *Client) Unsubscribe(key string, chn chan string) error {
	data, ok := s.subscriptions.Get(key)
func (s *Client) UnsubscribeKey(key string, chn chan KeyValuePair) error {
	data, ok := s.keysubs.Get(key)
	if !ok {
		return nil
	}
	chans := data.([]chan string)
	chans := data.([]chan KeyValuePair)

	found := false
	for idx, sub := range chans {
		if sub == chn {
			s.subscriptions.Set(key, append(chans[:idx], chans[idx+1:]...))
			chans = append(chans[:idx], chans[idx+1:]...)
			s.keysubs.Set(key, chans)
			found = true
		}
	}


@@ 254,6 273,66 @@ func (s *Client) Unsubscribe(key string, chn chan string) error {
	return nil
}

func (s *Client) SubscribePrefix(prefix string) (chan KeyValuePair, error) {
	chn := make(chan KeyValuePair, 10)

	var subs []chan KeyValuePair
	data, ok := s.prefixsubs.Get(prefix)
	if ok {
		subs = data.([]chan KeyValuePair)
	}

	needsAPISubscription := !ok || len(subs) < 1
	s.prefixsubs.Set(prefix, append(subs, chn))

	var err error
	// If this is the first time we subscribe to this key, ask server to push updates
	if needsAPISubscription {
		_, err = s.makeRequest(kv.Request{
			CmdName: kv.CmdSubscribePrefix,
			Data: map[string]interface{}{
				"prefix": prefix,
			},
		})
	}

	return chn, err
}

func (s *Client) UnsubscribePrefix(prefix string, chn chan KeyValuePair) error {
	data, ok := s.prefixsubs.Get(prefix)
	if !ok {
		return nil
	}
	chans := data.([]chan KeyValuePair)

	found := false
	for idx, sub := range chans {
		if sub == chn {
			chans = append(chans[:idx], chans[idx+1:]...)
			s.prefixsubs.Set(prefix, chans)
			found = true
		}
	}

	if !found {
		return ErrSubscriptionNotFound
	}

	// If we removed all subscribers, ask server to not push updates to us anymore
	if len(chans) < 1 {
		_, err := s.makeRequest(kv.Request{
			CmdName: kv.CmdUnsubscribePrefix,
			Data: map[string]interface{}{
				"prefix": prefix,
			},
		})
		return err
	}

	return nil
}

func (s *Client) makeRequest(request kv.Request) (kv.Response, error) {
	rid := ""
	for {

A client_test.go => client_test.go +187 -0
@@ 0,0 1,187 @@
package kvclient

import (
	"net/http"
	"net/http/httptest"
	"testing"
	"time"

	"github.com/dgraph-io/badger/v3"
	"github.com/sirupsen/logrus"

	kv "github.com/strimertul/kilovolt/v4"
)

func TestCommands(t *testing.T) {
	log := logrus.New()
	log.Level = logrus.TraceLevel

	server, _ := createInMemoryKV(t, log)

	client, err := NewClient(server.URL, ClientOptions{
		Logger: log,
	})
	if err != nil {
		t.Fatal("error creating kv client", err.Error())
	}

	t.Run("SetKey", func(t *testing.T) {
		if err := client.SetKey("test", "test1234"); err != nil {
			t.Fatal("error setting key to string value", err.Error())
		}
	})
	t.Run("GetKey", func(t *testing.T) {
		val, err := client.GetKey("test")
		if err != nil {
			t.Fatal("error getting string key", err.Error())
		}
		if val != "test1234" {
			t.Fatalf("returned value is different than expected, expected=%s got=%s", "test1234", val)
		}
	})

	type RandomStruct struct {
		Value int64
		Other string
	}
	t.Run("SetJSON", func(t *testing.T) {
		if err := client.SetJSON("test", RandomStruct{
			Value: 1234,
			Other: "wow!",
		}); err != nil {
			t.Fatal("error setting key to JSON value", err.Error())
		}
	})
	t.Run("GetJSON", func(t *testing.T) {
		var rnd RandomStruct
		if err := client.GetJSON("test", &rnd); err != nil {
			t.Fatal("error getting JSON key", err.Error())
		}
		if rnd.Value != 1234 || rnd.Other != "wow!" {
			t.Fatal("deserialized JSON has different values than expected")
		}
	})

	var chn chan KeyValuePair
	t.Run("SubscribeKey", func(t *testing.T) {
		var err error
		chn, err = client.SubscribeKey("test")
		if err != nil {
			t.Fatal("error subscribing to key", err.Error())
		}
	})
	t.Run("UnsubscribeKey", func(t *testing.T) {
		err := client.UnsubscribeKey("test", chn)
		if err != nil {
			t.Fatal("error unsubscribing from key", err.Error())
		}
	})

	t.Run("SubscribePrefix", func(t *testing.T) {
		var err error
		chn, err = client.SubscribePrefix("test")
		if err != nil {
			t.Fatal("error subscribing to prefix", err.Error())
		}
	})
	t.Run("UnsubscribePrefix", func(t *testing.T) {
		err := client.UnsubscribePrefix("test", chn)
		if err != nil {
			t.Fatal("error unsubscribing from prefix", err.Error())
		}
	})
}

func TestKeySubscription(t *testing.T) {
	log := logrus.New()
	log.Level = logrus.TraceLevel

	server, _ := createInMemoryKV(t, log)

	client, err := NewClient(server.URL, ClientOptions{
		Logger: log,
	})
	if err != nil {
		t.Fatal("error creating kv client", err.Error())
	}

	chn, err := client.SubscribeKey("subtest")
	if err != nil {
		t.Fatal("error subscribing to key", err.Error())
	}

	if err = client.SetKey("subtest", "testvalue1234"); err != nil {
		t.Fatal("error modifying key", err.Error())
	}
	// Check for pushes
	select {
	case <-time.After(20 * time.Second):
		t.Fatal("push took too long to arrive")
	case push := <-chn:
		if push.Key != "subtest" || push.Value != "testvalue1234" {
			t.Fatal("wrong value received", push)
		}
	}

	if err = client.UnsubscribeKey("subtest", chn); err != nil {
		t.Fatal("error unsubscribing from key", err.Error())
	}
}

func TestPrefixSubscription(t *testing.T) {
	log := logrus.New()
	log.Level = logrus.TraceLevel

	server, _ := createInMemoryKV(t, log)

	client, err := NewClient(server.URL, ClientOptions{
		Logger: log,
	})
	if err != nil {
		t.Fatal("error creating kv client", err.Error())
	}

	chn, err := client.SubscribePrefix("sub")
	if err != nil {
		t.Fatal("error subscribing to prefix", err.Error())
	}

	if err = client.SetKey("subAAAA", "testvalue56709"); err != nil {
		t.Fatal("error modifying key", err.Error())
	}
	// Check for pushes
	select {
	case <-time.After(20 * time.Second):
		t.Fatal("push took too long to arrive")
	case push := <-chn:
		if push.Key != "subAAAA" || push.Value != "testvalue56709" {
			t.Fatal("wrong value received", push)
		}
	}

	if err = client.UnsubscribePrefix("sub", chn); err != nil {
		t.Fatal("error unsubscribing from prefix", err.Error())
	}
}

func createInMemoryKV(t *testing.T, log logrus.FieldLogger) (*httptest.Server, *kv.Hub) {
	// Open in-memory DB
	options := badger.DefaultOptions("").WithInMemory(true).WithLogger(log)
	db, err := badger.Open(options)
	if err != nil {
		t.Fatal("db initialization failed", err.Error())
	}

	// Create hub with in-mem DB
	hub, err := kv.NewHub(db, log)
	if err != nil {
		t.Fatal("hub initialization failed", err.Error())
	}
	go hub.Run()

	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		kv.ServeWs(hub, w, r)
	}))

	return ts, hub
}

M go.mod => go.mod +4 -3
@@ 1,11 1,12 @@
module github.com/strimertul/kilovolt-client-go
module github.com/strimertul/kilovolt-client-go/v2

go 1.16

require (
	github.com/dgraph-io/badger/v3 v3.2011.1
	github.com/gorilla/websocket v1.4.2
	github.com/json-iterator/go v1.1.11
	github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc // indirect
	github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
	github.com/sirupsen/logrus v1.8.1
	github.com/strimertul/kilovolt/v3 v3.0.0
	github.com/strimertul/kilovolt/v4 v4.0.1
)

M go.sum => go.sum +4 -0
@@ 90,8 90,12 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/strimertul/kilovolt-client-go v1.1.1 h1:uy6y/WKJyubAPHb+wPJz5We+fVwzWIplHiclSAhEY2E=
github.com/strimertul/kilovolt-client-go v1.1.1/go.mod h1:jXbd37kXDdDeKnOWao/JSNMdSYmuhBBHG+LWIBzuXr8=
github.com/strimertul/kilovolt/v3 v3.0.0 h1:3gE0FdH3fL5UgMocR6Z7lI9hk3jD8ds8yL47D4Z7758=
github.com/strimertul/kilovolt/v3 v3.0.0/go.mod h1:AgfPYRp+kffN64tcqCcQUZdpL/Dm5DGHIYRDm9t3E0Y=
github.com/strimertul/kilovolt/v4 v4.0.1 h1:81isohdSixVURO2+dZKKZBPw97HJmNN4/BXn6ADFoWM=
github.com/strimertul/kilovolt/v4 v4.0.1/go.mod h1:AO2ZFQtSB+AcjCw0RTkXjbM6XBAjhsXsrRq10BX95kw=
github.com/twitchyliquid64/golang-asm v0.15.0/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=