~ashkeel/strimertul

05959dbad682b9aa3830e31f973debe07889546b — Ash Keel a month ago d1cd25e master
refactor: switch websocket library
4 files changed, 191 insertions(+), 61 deletions(-)

M database/database_test.go
M go.mod
M go.sum
M twitch/eventsub/client.go
M database/database_test.go => database/database_test.go +168 -34
@@ 1,6 1,7 @@
package database

import (
	"context"
	"encoding/json"
	"errors"
	"testing"


@@ 144,14 145,24 @@ func TestLocalDBClientGetKey(t *testing.T) {
		t.Fatal(err)
	}

	// Retrieve the key using the local client
	stored, err := client.GetKey(key)
	if err != nil {
		t.Fatal(err)
	}
	if stored != value {
		t.Fatalf("expected %s, got %s", value, stored)
	}
	t.Run("get stored value", func(t *testing.T) {
		// Retrieve the key using the local client
		stored, getErr := client.GetKey(key)
		if getErr != nil {
			t.Fatal(getErr)
		}
		if stored != value {
			t.Fatalf("expected %s, got %s", value, stored)
		}
	})

	t.Run("get non-existent key", func(t *testing.T) {
		val, _ := client.GetKey("non-existent")

		if val != "" {
			t.Fatalf("expected empty string for non-existent key, got %s", val)
		}
	})
}

func TestLocalDBClientGetJSON(t *testing.T) {


@@ 162,36 173,51 @@ func TestLocalDBClientGetJSON(t *testing.T) {
		A string
		B int
	}
	testStruct := test{
		A: "test",
		B: 42,
	}

	// Store a key directly in the store
	key := "test"
	byt, err := json.Marshal(testStruct)
	if err != nil {
		t.Fatal(err)
	}
	t.Run("get JSON key", func(t *testing.T) {
		testStruct := test{
			A: "test",
			B: 42,
		}

	err = store.Set(key, string(byt))
	if err != nil {
		t.Fatal(err)
	}
		// Store a key directly in the store
		key := "test"
		byt, err := json.Marshal(testStruct)
		if err != nil {
			t.Fatal(err)
		}

	// Retrieve the key using the local client
	var stored test
	err = client.GetJSON(key, &stored)
	if err != nil {
		t.Fatal(err)
	}
		err = store.Set(key, string(byt))
		if err != nil {
			t.Fatal(err)
		}

	if stored.A != testStruct.A {
		t.Fatalf("expected A to be %s, got %s", testStruct.A, stored.A)
	}
	if stored.B != testStruct.B {
		t.Fatalf("expected B to be %d, got %d", testStruct.B, stored.B)
	}
		// Retrieve the key using the local client
		var stored test
		err = client.GetJSON(key, &stored)
		if err != nil {
			t.Fatal(err)
		}

		if stored.A != testStruct.A {
			t.Fatalf("expected A to be %s, got %s", testStruct.A, stored.A)
		}
		if stored.B != testStruct.B {
			t.Fatalf("expected B to be %d, got %d", testStruct.B, stored.B)
		}
	})

	t.Run("get non-existent key", func(t *testing.T) {
		var stored test
		err := client.GetJSON("my random key", &stored)
		if err == nil {
			t.Fatal("expected error for non-existent key")
		}

		if !errors.Is(err, ErrEmptyKey) {
			t.Fatalf("expected ErrEmptyKey, got %s", err)
		}
	})
}

func TestLocalDBClientGetAll(t *testing.T) {


@@ 317,3 343,111 @@ func TestLocalDBClientSubscribePrefix(t *testing.T) {
		t.Fatal("expected value to be received")
	}
}

func TestLocalDBClientSubscribeKeyContext(t *testing.T) {
	client, _ := CreateInMemoryLocalClient(t)
	defer CleanupLocalClient(client)

	// Subscribe to a key using the local client
	key := "test"
	ch := make(chan string, 1)
	ctx, cancel := context.WithCancel(context.Background())
	err := client.SubscribeKeyContext(ctx, key, func(newValue string) {
		ch <- newValue
	})
	if err != nil {
		t.Fatal(err)
	}
	defer cancel()

	// Store a key
	err = client.PutKey(key, "value")
	if err != nil {
		t.Fatal(err)
	}

	select {
	case newValue := <-ch:
		if newValue != "value" {
			t.Fatalf("expected value to be %s, got %s", "value", newValue)
		}
	default:
		t.Fatal("expected value to be received")
	}

	// Cancel the context
	cancel()

	// Store another key
	err = client.PutKey(key, "value2")
	if err != nil {
		t.Fatal(err)
	}

	// Should not receive a value (channel should be closed)
	select {
	case newValue := <-ch:
		t.Fatalf("expected no value to be received, got %s", newValue)
	case <-time.After(time.Second):
		// Expected
	}
}

func TestLocalDBClientSubscribePrefixContext(t *testing.T) {
	client, _ := CreateInMemoryLocalClient(t)
	defer CleanupLocalClient(client)

	// Subscribe to a prefix using the local client
	prefix := "test"
	ch := make(chan string, 1)
	ctx, cancel := context.WithCancel(context.Background())
	err := client.SubscribePrefixContext(ctx, func(_, newValue string) {
		ch <- newValue
	}, prefix)
	if err != nil {
		t.Fatal(err)
	}
	defer cancel()

	// Write a key
	err = client.PutKey("testWithStuff", "value")
	if err != nil {
		t.Fatal(err)
	}

	select {
	case newValue := <-ch:
		if newValue != "value" {
			t.Fatalf("expected value to be %s, got %s", "value", newValue)
		}
	case <-time.After(time.Second * 2):
		t.Fatal("expected value to be received")
	}

	// Cancel the context
	cancel()

	// Write another key
	err = client.PutKey("testWithStuff2", "value2")
	if err != nil {
		t.Fatal(err)
	}

	// Should not receive a value (channel should be closed)
	select {
	case newValue := <-ch:
		t.Fatalf("expected no value to be received, got %s", newValue)
	case <-time.After(time.Second):
		// Expected
	}
}

func TestLocalDBClientHub(t *testing.T) {
	client, _ := CreateInMemoryLocalClient(t)
	defer CleanupLocalClient(client)

	hub := client.Hub() // Should not panic
	if hub == nil {
		t.Fatal("expected hub to be returned")
	}
}

M go.mod => go.mod +5 -6
@@ 4,16 4,16 @@ go 1.22

require (
	git.sr.ht/~ashkeel/containers v0.3.6
	git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.5
	git.sr.ht/~ashkeel/kilovolt/v12 v12.0.4
	git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.6
	git.sr.ht/~ashkeel/kilovolt/v12 v12.0.5
	github.com/Masterminds/sprig/v3 v3.3.0
	github.com/apenwarr/fixconsole v0.0.0-20191012055117-5a9f6489cc29
	github.com/cockroachdb/pebble v1.1.2
	github.com/gorilla/websocket v1.5.1
	github.com/coder/websocket v1.8.12
	github.com/hashicorp/golang-lru/v2 v2.0.7
	github.com/nicklaw5/helix/v2 v2.30.0
	github.com/samber/slog-multi v1.2.3
	github.com/urfave/cli/v2 v2.27.4
	github.com/urfave/cli/v2 v2.27.5
	github.com/wailsapp/wails/v2 v2.9.2
	gopkg.in/natefinch/lumberjack.v2 v2.2.1
)


@@ 32,7 32,7 @@ require (
	github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
	github.com/cockroachdb/redact v1.1.5 // indirect
	github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
	github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
	github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
	github.com/getsentry/sentry-go v0.27.0 // indirect
	github.com/go-ole/go-ole v1.2.6 // indirect
	github.com/godbus/dbus/v5 v5.1.0 // indirect


@@ 81,5 81,4 @@ require (
	golang.org/x/sys v0.23.0 // indirect
	golang.org/x/text v0.18.0 // indirect
	google.golang.org/protobuf v1.33.0 // indirect
	nhooyr.io/websocket v1.8.10 // indirect
)

M go.sum => go.sum +10 -12
@@ 35,10 35,10 @@ dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.sr.ht/~ashkeel/containers v0.3.6 h1:+umWlQGKhLxGQlaEUt/F6rBZGpeBd1T01fM3wro+qTY=
git.sr.ht/~ashkeel/containers v0.3.6/go.mod h1:i2KocnJfRH0FwfgPi4nw7/ehYLEoLlP3iwdDoBeVdME=
git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.5 h1:BokBYWw1Qxdr7yveaMjmMG41ozlgKbnemTeEG/KbQTw=
git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.5/go.mod h1:c/op4TWAvsz76ve1gptP8Gmn+z4Sgb/PKHq6RtaqDds=
git.sr.ht/~ashkeel/kilovolt/v12 v12.0.4 h1:6G5RJaUu6GyrQtyXV03J0WrNPV/GxxrXowYq2p1vewc=
git.sr.ht/~ashkeel/kilovolt/v12 v12.0.4/go.mod h1:dRSJpl6ZXNoTAF3pTMC4AO7MLkRgzQqaZYR8S5a46TI=
git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.6 h1:v4YdGoY0KPYRwUdDcpvR54TbL+GBwZOZeLMGrbDrP0E=
git.sr.ht/~ashkeel/kilovolt-driver-pebble v1.3.6/go.mod h1:DCGu8RCF/K2JAZV8Ulh8h/zoEI1l+7wgyQI+8HlIT0M=
git.sr.ht/~ashkeel/kilovolt/v12 v12.0.5 h1:5E0sTM/ftnMq59al8sQRN0QgtkvMwgGsYYsdLlDuJcM=
git.sr.ht/~ashkeel/kilovolt/v12 v12.0.5/go.mod h1:3LILZMHw0q8mpngOYGSpo/XBG0R9rmNK/CALZsrc0U0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=


@@ 88,8 88,10 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=


@@ 182,8 184,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=


@@ 319,8 319,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tkrajina/go-reflector v0.5.6 h1:hKQ0gyocG7vgMD2M3dRlYN6WBBOmdoOzJ6njQSepKdE=
github.com/tkrajina/go-reflector v0.5.6/go.mod h1:ECbqLgccecY5kPmPmXg1MrHW585yMcDkVl6IvJe64T4=
github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8=
github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=


@@ 641,8 641,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

M twitch/eventsub/client.go => twitch/eventsub/client.go +8 -9
@@ 10,8 10,7 @@ import (
	"git.sr.ht/~ashkeel/strimertul/log"

	"git.sr.ht/~ashkeel/strimertul/database"
	"git.sr.ht/~ashkeel/strimertul/utils"
	"github.com/gorilla/websocket"
	"github.com/coder/websocket"
	lru "github.com/hashicorp/golang-lru/v2"
	"github.com/nicklaw5/helix/v2"
)


@@ 64,20 63,20 @@ func (c *Client) eventSubLoop() {
		}
	}
	if connection != nil {
		utils.Close(connection)
		_ = connection.CloseNow()
	}
}

func readLoop(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) {
func readLoop(ctx context.Context, connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) {
	for {
		messageType, messageData, err := connection.ReadMessage()
		messageType, messageData, err := connection.Read(ctx)
		if err != nil {
			wsErr <- err
			close(recv)
			close(wsErr)
			return
		}
		if messageType != websocket.TextMessage {
		if messageType != websocket.MessageText {
			continue
		}



@@ 86,7 85,7 @@ func readLoop(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error
}

func (c *Client) connectWebsocket(url string, oldConnection *websocket.Conn) (string, *websocket.Conn, error) {
	connection, _, err := websocket.DefaultDialer.Dial(url, nil)
	connection, _, err := websocket.Dial(c.ctx, url, nil)
	if err != nil {
		c.logger.Error("Could not establish a connection to the EventSub websocket", log.Error(err))
		return "", nil, err


@@ 95,7 94,7 @@ func (c *Client) connectWebsocket(url string, oldConnection *websocket.Conn) (st
	received := make(chan []byte, 10)
	wsErr := make(chan error, 1)

	go readLoop(connection, received, wsErr)
	go readLoop(c.ctx, connection, received, wsErr)

	for {
		// Wait for next message or closing/error


@@ 137,7 136,7 @@ func (c *Client) processMessage(wsMessage WebsocketMessage, oldConnection *webso

		// We can only close the old connection once the new one has been established
		if oldConnection != nil {
			utils.Close(oldConnection)
			_ = oldConnection.CloseNow()
		}

		// Add subscription to websocket session