~fnux/yggdrasil-go-coap

37d34c95d777589f12368fd30ad9e22b74e6d575 — Jozef Kralik 2 years ago 5109c0e
implement Sequence to connection
7 files changed, 35 insertions(+), 22 deletions(-)

M blockwise.go
M client.go
M clientcommander.go
M multicastClient.go
M networksession.go
M request.go
M server.go
M blockwise.go => blockwise.go +1 -1
@@ 684,7 684,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
				// We need to be careful to create a new response writer for the
				// new request, otherwise the server may attempt to respond to
				// the wrong request.
				newReq := &Request{Client: r.Client, Msg: msg, Ctx: r.Ctx, SeqNum: r.SeqNum}
				newReq := &Request{Client: r.Client, Msg: msg, Ctx: r.Ctx, Sequence: r.Client.Sequence()}
				newWriter := responseWriterFromRequest(newReq)
				next(newWriter, newReq)
				return

M client.go => client.go +5 -0
@@ 369,6 369,11 @@ func (co *ClientConn) Close() error {
	return err
}

// Sequence discontinuously unique growing number for connection.
func (co *ClientConn) Sequence() uint64 {
	return co.commander.Sequence()
}

// Dial connects to the address on the named network.
func Dial(network, address string) (*ClientConn, error) {
	client := Client{Net: network}

M clientcommander.go => clientcommander.go +16 -11
@@ 187,7 187,7 @@ func (cc *ClientCommander) DeleteWithContext(ctx context.Context, path string) (
type Observation struct {
	token     []byte
	path      string
	obsSeqNum uint32
	obsSequence uint32
	client    *ClientCommander
}



@@ 237,7 237,7 @@ func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, 
	o := &Observation{
		token:     req.Token(),
		path:      path,
		obsSeqNum: 0,
		obsSequence: 0,
		client:    cc,
	}
	err = cc.networkSession.TokenHandler().Add(req.Token(), func(w ResponseWriter, r *Request) {


@@ 265,14 265,14 @@ func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, 
				return
			}
		}
		setObsSeqNum := func() bool {
		setObsSequence := func() bool {
			if r.Msg.Option(Observe) != nil {
				obsSeqNum := r.Msg.Option(Observe).(uint32)
				//obs starts with 0, after that check obsSeqNum
				if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
				obsSequence := r.Msg.Option(Observe).(uint32)
				//obs starts with 0, after that check obsSequence
				if obsSequence != 0 && o.obsSequence > obsSequence {
					return false
				}
				o.obsSeqNum = obsSeqNum
				o.obsSequence = obsSequence
			}
			return true
		}


@@ 281,13 281,13 @@ func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, 
		case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				if setObsSeqNum() {
					observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, SeqNum: r.SeqNum})
				if setObsSequence() {
					observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, Sequence: r.Sequence})
				}
			}
		default:
			if setObsSeqNum() {
				observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, SeqNum: r.SeqNum})
			if setObsSequence() {
				observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx, Sequence: r.Sequence})
			}
		}
		return


@@ 308,3 308,8 @@ func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, 
func (cc *ClientCommander) Close() error {
	return cc.networkSession.Close()
}

// Sequence discontinuously unique growing number for connection.
func (cc *ClientCommander) Sequence() uint64 {
	return cc.networkSession.Sequence()
}

M multicastClient.go => multicastClient.go +1 -1
@@ 197,7 197,7 @@ func (mconn *MulticastClientConn) PublishMsgWithContext(ctx context.Context, req
				return
			}
		}
		responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx, SeqNum: r.SeqNum})
		responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx, Sequence: r.Client.Sequence()})
	})
	if err != nil {
		return nil, err

M networksession.go => networksession.go +9 -2
@@ 33,6 33,8 @@ type networkSession interface {
	ExchangeWithContext(ctx context.Context, req Message) (Message, error)
	// Send ping to peer and wait for pong
	PingWithContext(ctx context.Context) error
	// Sequence discontinuously unique growing number for connection.
	Sequence() uint64

	// handlePairMsg Message was handled by pair
	handlePairMsg(w ResponseWriter, r *Request) bool


@@ 123,8 125,9 @@ type sessionResp struct {
}

type sessionBase struct {
	srv     *Server
	handler *TokenHandler
	srv      *Server
	handler  *TokenHandler
	sequence uint64

	blockWiseTransfer    bool
	blockWiseTransferSzx uint32 //BlockWiseSzx


@@ 185,6 188,10 @@ func (s *sessionBase) setBlockWiseSzx(szx BlockWiseSzx) {
	atomic.StoreUint32(&s.blockWiseTransferSzx, uint32(szx))
}

func (s *sessionBase) Sequence() uint64 {
	return atomic.AddUint64(&s.sequence, 1)
}

func (s *sessionBase) blockWiseMaxPayloadSize(peer BlockWiseSzx) (int, BlockWiseSzx) {
	szx := s.blockWiseSzx()
	if peer < szx {

M request.go => request.go +1 -1
@@ 6,5 6,5 @@ type Request struct {
	Msg    Message
	Client *ClientConn
	Ctx    context.Context
	SeqNum uint64 // discontinuously growing number for every request from connection starts from 0
	Sequence uint64 // discontinuously growing number for every request from connection starts from 0
}

M server.go => server.go +2 -6
@@ 431,7 431,6 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er

	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	seqNum := uint64(0)

	for {
		mti, err := readTcpMsgInfo(ctx, conn)


@@ 460,8 459,7 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er
		// We will block poller wait loop when
		// all pool workers are busy.
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx, SeqNum: seqNum})
		seqNum++
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx, Sequence: c.Sequence()})
	}
}



@@ 512,7 510,6 @@ func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
	connUDP := kitNet.NewConnUDP(conn, srv.heartBeat())
	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	seqNum := uint64(0)

	for {
		m := make([]byte, ^uint16(0))


@@ 544,8 541,7 @@ func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
			continue
		}
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Msg: msg, Client: &c, Ctx: sessCtx, SeqNum: seqNum})
		seqNum++
		srv.spawnWorker(&Request{Msg: msg, Client: &c, Ctx: sessCtx, Sequence: c.Sequence()})
	}
}