~fnux/yggdrasil-go-coap

5109c0eb3d083b7b1c011c4480bcc8145a5fe271 — Jozef Kralik 2 years ago b607a86
provides SeqNum for each request from connection
5 files changed, 11 insertions(+), 6 deletions(-)

M blockwise.go
M clientcommander.go
M multicastClient.go
M request.go
M server.go
M blockwise.go => blockwise.go +1 -1
@@ 684,7 684,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
				// We need to be careful to create a new response writer for the
				// new request, otherwise the server may attempt to respond to
				// the wrong request.
				newReq := &Request{Client: r.Client, Msg: msg, Ctx: r.Ctx}
				newReq := &Request{Client: r.Client, Msg: msg, Ctx: r.Ctx, SeqNum: r.SeqNum}
				newWriter := responseWriterFromRequest(newReq)
				next(newWriter, newReq)
				return

M clientcommander.go => clientcommander.go +2 -2
@@ 282,12 282,12 @@ func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, 
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				if setObsSeqNum() {
					observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx})
					observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, SeqNum: r.SeqNum})
				}
			}
		default:
			if setObsSeqNum() {
				observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx})
				observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, SeqNum: r.SeqNum})
			}
		}
		return

M multicastClient.go => multicastClient.go +1 -1
@@ 197,7 197,7 @@ func (mconn *MulticastClientConn) PublishMsgWithContext(ctx context.Context, req
				return
			}
		}
		responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx})
		responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx, SeqNum: r.SeqNum})
	})
	if err != nil {
		return nil, err

M request.go => request.go +1 -0
@@ 6,4 6,5 @@ type Request struct {
	Msg    Message
	Client *ClientConn
	Ctx    context.Context
	SeqNum uint64 // discontinuously growing number for every request from connection starts from 0
}

M server.go => server.go +6 -2
@@ 431,6 431,7 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er

	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	seqNum := uint64(0)

	for {
		mti, err := readTcpMsgInfo(ctx, conn)


@@ 459,7 460,8 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er
		// We will block poller wait loop when
		// all pool workers are busy.
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx})
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx, SeqNum: seqNum})
		seqNum++
	}
}



@@ 510,6 512,7 @@ func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
	connUDP := kitNet.NewConnUDP(conn, srv.heartBeat())
	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	seqNum := uint64(0)

	for {
		m := make([]byte, ^uint16(0))


@@ 541,7 544,8 @@ func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
			continue
		}
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Msg: msg, Client: &c, Ctx: sessCtx})
		srv.spawnWorker(&Request{Msg: msg, Client: &c, Ctx: sessCtx, SeqNum: seqNum})
		seqNum++
	}
}