@@ 6,6 6,7 @@ import (
"math/rand"
"net/http"
"net/url"
+ "strings"
"sync"
"github.com/gorilla/websocket"
@@ 27,7 28,7 @@ type Client struct {
headers http.Header
ws *websocket.Conn
mu sync.Mutex // Used to avoid concurrent writes to socket
- requests map[string]chan<- []byte
+ requests map[string]chan<- string
subscriptions map[string][]chan<- string
}
@@ 47,7 48,7 @@ func NewClient(endpoint string, options ClientOptions) (*Client, error) {
headers: options.Headers,
ws: nil,
mu: sync.Mutex{},
- requests: make(map[string]chan<- []byte),
+ requests: make(map[string]chan<- string),
subscriptions: make(map[string][]chan<- string),
}
@@ 90,38 91,41 @@ func (s *Client) ConnectToWebsocket() error {
continue
}
- var response kv.Response
- err = jsoniter.ConfigFastest.Unmarshal(message, &response)
- if err != nil {
- s.Logger.WithError(err).Error("websocket deserialize error")
- return
- }
- // Check message
- if response.RequestID != "" {
- // We have a request ID, send byte chunk over to channel
- if chn, ok := s.requests[response.RequestID]; ok {
- chn <- message
- } else {
- s.Logger.WithField("rid", response.RequestID).Error("received response for unknown RID")
+ submessages := strings.Split(string(message), "\n")
+ for _, msg := range submessages {
+ var response kv.Response
+ err = jsoniter.ConfigFastest.UnmarshalFromString(msg, &response)
+ if err != nil {
+ s.Logger.WithError(err).Error("websocket deserialize error")
+ return
}
- } else {
- // Might be a push
- switch response.CmdType {
- case "push":
- var push kv.Push
- err = jsoniter.ConfigFastest.Unmarshal(message, &push)
- if err != nil {
- s.Logger.WithError(err).Error("websocket deserialize error")
- continue
+ // Check message
+ if response.RequestID != "" {
+ // We have a request ID, send byte chunk over to channel
+ if chn, ok := s.requests[response.RequestID]; ok {
+ chn <- msg
+ } else {
+ s.Logger.WithField("rid", response.RequestID).Error("received response for unknown RID")
}
- // Deliver to subscriptions
- for sub, chans := range s.subscriptions {
- if push.Key != sub {
+ } else {
+ // Might be a push
+ switch response.CmdType {
+ case "push":
+ var push kv.Push
+ err = jsoniter.ConfigFastest.UnmarshalFromString(msg, &push)
+ if err != nil {
+ s.Logger.WithError(err).Error("websocket deserialize error")
continue
}
-
- for _, chann := range chans {
- chann <- push.NewValue
+ // Deliver to subscriptions
+ for sub, chans := range s.subscriptions {
+ if push.Key != sub {
+ continue
+ }
+
+ for _, chann := range chans {
+ chann <- push.NewValue
+ }
}
}
}
@@ 251,7 255,7 @@ func (s *Client) makeRequest(request kv.Request) (kv.Response, error) {
break
}
- responseChannel := make(chan []byte)
+ responseChannel := make(chan string)
s.requests[rid] = responseChannel
request.RequestID = rid
@@ 264,11 268,11 @@ func (s *Client) makeRequest(request kv.Request) (kv.Response, error) {
message := <-responseChannel
var response kv.Response
- err = jsoniter.ConfigFastest.Unmarshal(message, &response)
+ err = jsoniter.ConfigFastest.UnmarshalFromString(message, &response)
if !response.Ok {
var resperror kv.Error
- err = jsoniter.ConfigFastest.Unmarshal(message, &resperror)
+ err = jsoniter.ConfigFastest.UnmarshalFromString(message, &resperror)
if err != nil {
return kv.Response{}, err
}