~fnux/yggdrasil-go-coap

cf6d606c94ee01e5efc1031ea57f1d56037bf3c4 — Jozef Kralik 2 years ago 9662f3f
add ClientCommander as interface for users
M README.md => README.md +22 -98
@@ 7,6 7,7 @@
Features supported:
* CoAP over UDP [RFC 7252][coap].
* CoAP over TCP/TLS [RFC 8232][coap-tcp]
* Observe resources in CoAP [RFC 7641][coap-observe]
* Block-wise transfers in COAP [RFC 7959][coap-block-wise-transfers]
* request multiplexer
* multicast


@@ 19,6 20,7 @@ Fork of https://github.com/dustin/go-coap
[coap]: http://tools.ietf.org/html/rfc7252
[coap-tcp]: https://tools.ietf.org/html/rfc8323
[coap-block-wise-transfers]: https://tools.ietf.org/html/rfc7959
[coap-observe]: https://tools.ietf.org/html/rfc7641

## Samples



@@ 27,23 29,23 @@ Fork of https://github.com/dustin/go-coap
#### Server UDP/TCP
```go
	// Server
	// See /examples/simpler/server/main.go
	func handleA(w coap.SessionNet, req coap.Message) {
		log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
		if req.IsConfirmable() {
			res := w.NewMessage(coap.MessageParams{
	// See /examples/simple/server/main.go
	func handleA(w coap.ResponseWriter, req *coap.Request) {
		log.Printf("Got message in handleA: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
		if req.Msg.IsConfirmable() {
			res := req.Client.NewMessage(coap.MessageParams{
				Type:      coap.Acknowledgement,
				Code:      coap.Content,
				MessageID: req.MessageID(),
				Token:     req.Token(),
				Payload:   []byte("server: hello world!"),
				MessageID: req.Msg.MessageID(),
				Token:     req.Msg.Token(),
				Payload:   []byte("hello to you!"),
			})
			res.SetOption(coap.ContentFormat, coap.TextPlain)

			log.Printf("Transmitting from B %#v", res)
			w.WriteMsg(res, time.Hour)
			log.Printf("Transmitting from A %#v", res)
			w.Write(res)
		}
	}
	}	

	func main() {
		mux := coap.NewServeMux()


@@ 75,23 77,13 @@ Fork of https://github.com/dustin/go-coap
			log.Fatalf("Error dialing: %v", err)
		}

		req := co.NewMessage(coap.MessageParams{
			Type:      coap.Confirmable,
			Code:      coap.GET,
			MessageID: 12345,
			Payload:   []byte("client: hello, world!"),
			Token:	   []byte("TOKEN"),
		})
		req.SetPathString("/a")
		resp, err := co.Get(path)

		rv, _, err := co.Exchange(req, 1 * time.Second)
		if err != nil {
			log.Fatalf("Error sending request: %v", err)
		}

		if rv != nil {
			log.Printf("Response payload: %v", rv.Payload())
		}
		log.Printf("Response payload: %v", resp.Payload())
	}
```



@@ 99,87 91,19 @@ Fork of https://github.com/dustin/go-coap
### Observe / Notify

#### Server
```go
	// Server
	// See /examples/observe/server/main.go

	func periodicTransmitter(w coap.SessionNet, req coap.Message) {
		subded := time.Now()

		for {
			msg := w.NewMessage(coap.MessageParams{
				Type:      coap.Acknowledgement,
				Code:      coap.Content,
				MessageID: req.MessageID(),
				Payload:   []byte(fmt.Sprintf("Been running for %v", time.Since(subded))),
			})

			msg.SetOption(coap.ContentFormat, coap.TextPlain)
			msg.SetOption(coap.LocationPath, req.Path())

			log.Printf("Transmitting %v", msg)
			err := w.WriteMsg(msg, time.Hour)
			if err != nil {
				log.Printf("Error on transmitter, stopping: %v", err)
				return
			}

			time.Sleep(time.Second)
		}
	}

	func main() {
		log.Fatal(coap.ListenAndServe(":5688", "udp",
			coap.HandlerFunc(func(w coap.SessionNet, req coap.Message) {
				log.Printf("Got message path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
				if req.Code() == coap.GET && req.Option(coap.Observe) != nil {
					value := req.Option(coap.Observe)
					if value.(uint32) > 0 {
						go periodicTransmitter(w, req)
					} else {
						log.Printf("coap.Observe value=%v", value)
					}
				}
			})))
	}
}
```
Look to examples/observe/server/main.go

#### Client
```go
	// Client
	// See /examples/observe/client/main.go
	func observe(s coap.SessionNet, m coap.Message) {
		log.Printf("Got %s", m.Payload())
	}
Look to examples/observe/client/main.go

	func main() {
		client := &coap.Client{ObserverFunc: observe}

		conn, err := client.Dial("localhost:5688")
		if err != nil {
			log.Fatalf("Error dialing: %v", err)
		}

		req := conn.NewMessage(coap.MessageParams{
			Type:      coap.NonConfirmable,
			Code:      coap.GET,
			MessageID: 12345,
		})
### Multicast

		req.AddOption(coap.Observe, 1)
		req.SetPathString("/some/path")

		err = conn.WriteMsg(req, time.Hour)
		if err != nil {
			log.Fatalf("Error sending request: %v", err)
		}
#### Server
Look to examples/mcast/server/main.go

		// waiting for messages that arrives in 20seconds
		time.Sleep(20 * time.Second)
		log.Printf("Done...\n")
	}
```
#### Client
Look to examples/mcast/client/main.go

## License
MIT

M blockwise.go => blockwise.go +29 -29
@@ 65,7 65,7 @@ func UnmarshalBlockOption(blockVal uint32) (szx BlockSzx, blockNumber uint, more
	return
}

func exchangeDrivedByPeer(session SessionNet, req Message, blockType OptionID) (Message, error) {
func exchangeDrivedByPeer(session networkSession, req Message, blockType OptionID) (Message, error) {
	if block, ok := req.Option(blockType).(uint32); ok {
		_, _, more, err := UnmarshalBlockOption(block)
		if err != nil {


@@ 82,14 82,14 @@ func exchangeDrivedByPeer(session SessionNet, req Message, blockType OptionID) (
	}

	pair := make(chan *Request, 1)
	session.sessionHandler().add(req.Token(), func(w ResponseWriter, r *Request, next HandlerFunc) {
	session.TokenHandler().Add(req.Token(), func(w ResponseWriter, r *Request) {
		select {
		case pair <- r:
		default:
			next(w, r)
			return
		}
	})
	defer session.sessionHandler().remove(req.Token())
	defer session.TokenHandler().Remove(req.Token())
	err := session.Write(req)
	if err != nil {
		return nil, err


@@ 138,7 138,7 @@ func newSender(peerDrive bool, blockType OptionID, suggestedSzx BlockSzx, expect
}

func (s *blockWiseSender) createReq(b *blockWiseSession) (Message, error) {
	req := b.SessionNet.NewMessage(MessageParams{
	req := b.networkSession.NewMessage(MessageParams{
		Code:      s.origin.Code(),
		Type:      s.coapType(),
		MessageID: s.origin.MessageID(),


@@ 178,9 178,9 @@ func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, e
		log.Printf("sendPayload %p req=%v\n", b, req)
	}
	if s.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req, s.blockType)
		resp, err = exchangeDrivedByPeer(b.networkSession, req, s.blockType)
	} else {
		resp, err = b.SessionNet.Exchange(req)
		resp, err = b.networkSession.Exchange(req)
	}
	if err != nil {
		return nil, err


@@ 289,21 289,21 @@ func (b *blockWiseSession) sendPayload(peerDrive bool, blockType OptionID, sugge
}

type blockWiseSession struct {
	SessionNet
	networkSession
}

func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
	switch msg.Code() {
	//these methods doesn't need to be handled by blockwise
	case CSM, Ping, Pong, Release, Abort, Empty:
		return b.SessionNet.Exchange(msg)
		return b.networkSession.Exchange(msg)
	case GET, DELETE:
		return b.receivePayload(false, msg, nil, Block2, msg.Code())
	case POST, PUT:
		return b.sendPayload(false, Block1, b.SessionNet.blockWiseSzx(), Continue, msg)
		return b.sendPayload(false, Block1, b.networkSession.blockWiseSzx(), Continue, msg)
	// for response code
	default:
		return b.sendPayload(true, Block2, b.SessionNet.blockWiseSzx(), Continue, msg)
		return b.sendPayload(true, Block2, b.networkSession.blockWiseSzx(), Continue, msg)
	}

}


@@ 311,7 311,7 @@ func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
func (b *blockWiseSession) Write(msg Message) error {
	switch msg.Code() {
	case CSM, Ping, Pong, Release, Abort, Empty, GET:
		return b.SessionNet.Write(msg)
		return b.networkSession.Write(msg)
	default:
		_, err := b.Exchange(msg)
		return err


@@ 337,7 337,7 @@ func (b *blockWiseSession) sendErrorMsg(code COAPCode, typ COAPType, token []byt
		MessageID: MessageID,
		Token:     token,
	})
	b.SessionNet.Write(req)
	b.networkSession.Write(req)
}

type blockWiseReceiver struct {


@@ 370,7 370,7 @@ func (r *blockWiseReceiver) coapType() COAPType {
}

func (r *blockWiseReceiver) createReq(b *blockWiseSession, resp Message) (Message, error) {
	req := b.SessionNet.NewMessage(MessageParams{
	req := b.networkSession.NewMessage(MessageParams{
		Code:      r.code,
		Type:      r.typ,
		MessageID: r.origin.MessageID(),


@@ 405,7 405,7 @@ func newReceiver(b *blockWiseSession, peerDrive bool, origin Message, resp Messa
		code:       code,
		origin:     origin,
		blockType:  blockType,
		currentSzx: b.SessionNet.blockWiseSzx(),
		currentSzx: b.networkSession.blockWiseSzx(),
		payload:    []byte{},
	}



@@ 476,9 476,9 @@ func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message,
	var resp Message
	var err error
	if r.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req, r.blockType)
		resp, err = exchangeDrivedByPeer(b.networkSession, req, r.blockType)
	} else {
		resp, err = b.SessionNet.Exchange(req)
		resp, err = b.networkSession.Exchange(req)
	}

	if blockWiseDebug {


@@ 629,25 629,25 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
	if r.Msg.Token() != nil {
		switch r.Msg.Code() {
		case PUT, POST:
			if b, ok := r.SessionNet.(*blockWiseSession); ok {
			if b, ok := r.Client.networkSession.(*blockWiseSession); ok {
				msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue)

				if err != nil {
					return
				}
				next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
				next(w, &Request{Client: r.Client, Msg: msg})
				return
			}
			/*
				//observe data
				case Content, Valid:
					if r.Msg.Option(Observe) != nil && r.Msg.Option(ETag) != nil {
						if b, ok := r.SessionNet.(*blockWiseSession); ok {
						if b, ok := r.networkSession.(*blockWiseSession); ok {
							token, err := GenerateToken(8)
							if err != nil {
								return
							}
							req := r.SessionNet.NewMessage(MessageParams{
							req := r.networkSession.NewMessage(MessageParams{
								Code:      GET,
								Type:      Confirmable,
								MessageID: GenerateMessageID(),


@@ 660,7 660,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
							if err != nil {
								return
							}
							next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
							next(w, &Request{networkSession: r.networkSession, Msg: msg})
							return
						}
					}*/


@@ 675,14 675,14 @@ type blockWiseResponseWriter struct {
}

func (w *blockWiseResponseWriter) Write(msg Message) error {
	suggestedSzx := w.req.SessionNet.blockWiseSzx()
	suggestedSzx := w.req.Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockSzxBERT && !w.req.SessionNet.IsTCP() {
		if szx == BlockSzxBERT && !w.req.Client.networkSession.IsTCP() {
			return ErrInvalidBlockSzx
		}
		suggestedSzx = szx


@@ 693,7 693,7 @@ func (w *blockWiseResponseWriter) Write(msg Message) error {
		return w.responseWriter.Write(msg)
	}

	if b, ok := w.req.SessionNet.(*blockWiseSession); ok {
	if b, ok := w.req.Client.networkSession.(*blockWiseSession); ok {
		_, err := b.sendPayload(true, Block2, suggestedSzx, w.req.Msg.Code(), msg)
		return err
	}


@@ 706,14 706,14 @@ type blockWiseNoticeWriter struct {
}

func (w *blockWiseNoticeWriter) Write(msg Message) error {
	suggestedSzx := w.req.SessionNet.blockWiseSzx()
	suggestedSzx := w.req.Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockSzxBERT && !w.req.SessionNet.IsTCP() {
		if szx == BlockSzxBERT && !w.req.Client.networkSession.IsTCP() {
			return ErrInvalidBlockSzx
		}
		suggestedSzx = szx


@@ 724,13 724,13 @@ func (w *blockWiseNoticeWriter) Write(msg Message) error {
		return w.responseWriter.Write(msg)
	}

	if b, ok := w.req.SessionNet.(*blockWiseSession); ok {
	if b, ok := w.req.Client.networkSession.(*blockWiseSession); ok {
		s := newSender(false, Block2, suggestedSzx, w.req.Msg.Code(), msg)
		req, err := s.createReq(b)
		if err != nil {
			return err
		}
		return b.SessionNet.Write(req)
		return b.networkSession.Write(req)
	}
	return ErrNotSupported
}

M client.go => client.go +42 -193
@@ 3,10 3,8 @@ package coap
// A client implementation.

import (
	"bytes"
	"crypto/tls"
	"io"
	"io/ioutil"
	"log"
	"net"
	"strings"


@@ 17,7 15,7 @@ import (
type ClientConn struct {
	srv          *Server
	client       *Client
	session      SessionNet
	commander    *ClientCommander
	shutdownSync chan error
	multicast    bool
}


@@ 145,29 143,29 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
					log.Fatal("Client cannot send start: Timeout")
				}
			},
			NotifySessionEndFunc: func(s SessionNet, err error) {
			NotifySessionEndFunc: func(s *ClientCommander, err error) {
				if c.NotifySessionEndFunc != nil {
					c.NotifySessionEndFunc(err)
				}
			},
			createSessionTCPFunc: func(connection Conn, srv *Server) (SessionNet, error) {
				return clientConn.session, nil
			createSessionTCPFunc: func(connection Conn, srv *Server) (networkSession, error) {
				return clientConn.commander.networkSession, nil
			},
			createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.session.RemoteAddr().String() {
					if s, ok := clientConn.session.(*blockWiseSession); ok {
						s.SessionNet.(*sessionUDP).sessionUDPData = sessionUDPData
			createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.commander.networkSession.RemoteAddr().String() {
					if s, ok := clientConn.commander.networkSession.(*blockWiseSession); ok {
						s.networkSession.(*sessionUDP).sessionUDPData = sessionUDPData
					} else {
						clientConn.session.(*sessionUDP).sessionUDPData = sessionUDPData
						clientConn.commander.networkSession.(*sessionUDP).sessionUDPData = sessionUDPData
					}
					return clientConn.session, nil
					return clientConn.commander.networkSession, nil
				}
				session, err := newSessionUDP(connection, srv, sessionUDPData)
				if err != nil {
					return nil, err
				}
				if session.blockWiseEnabled() {
					return &blockWiseSession{SessionNet: session}, nil
					return &blockWiseSession{networkSession: session}, nil
				}
				return session, nil
			},


@@ 175,6 173,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		},
		shutdownSync: make(chan error),
		multicast:    multicast,
		commander:    &ClientCommander{},
	}

	switch clientConn.srv.Conn.(type) {


@@ 184,9 183,9 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			return nil, err
		}
		if session.blockWiseEnabled() {
			clientConn.session = &blockWiseSession{SessionNet: session}
			clientConn.commander.networkSession = &blockWiseSession{networkSession: session}
		} else {
			clientConn.session = session
			clientConn.commander.networkSession = session
		}
	case *net.UDPConn:
		// WriteMsgUDP returns error when addr is filled in SessionUDPData for connected socket


@@ 196,14 195,14 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			return nil, err
		}
		if session.blockWiseEnabled() {
			clientConn.session = &blockWiseSession{SessionNet: session}
			clientConn.commander.networkSession = &blockWiseSession{networkSession: session}
		} else {
			clientConn.session = session
			clientConn.commander.networkSession = session
		}
	}

	clientConn.session.SetReadDeadline(c.readTimeout())
	clientConn.session.SetWriteDeadline(c.writeTimeout())
	clientConn.commander.networkSession.SetReadDeadline(c.readTimeout())
	clientConn.commander.networkSession.SetWriteDeadline(c.writeTimeout())

	go func() {
		timeout := c.syncTimeout()


@@ 226,14 225,14 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	return clientConn, nil
}

// LocalAddr implements the SessionNet.LocalAddr method.
// LocalAddr implements the networkSession.LocalAddr method.
func (co *ClientConn) LocalAddr() net.Addr {
	return co.session.LocalAddr()
	return co.commander.LocalAddr()
}

// RemoteAddr implements the SessionNet.RemoteAddr method.
// RemoteAddr implements the networkSession.RemoteAddr method.
func (co *ClientConn) RemoteAddr() net.Addr {
	return co.session.RemoteAddr()
	return co.commander.RemoteAddr()
}

// Exchange performs a synchronous query. It sends the message m to the address


@@ 247,32 246,32 @@ func (co *ClientConn) Exchange(m Message) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.session.Exchange(m)
	return co.commander.Exchange(m)
}

// NewMessage Create message for request
func (co *ClientConn) NewMessage(p MessageParams) Message {
	return co.session.NewMessage(p)
	return co.commander.NewMessage(p)
}

// WriteMsg sends a message through the connection co.
// Write sends direct a message through the connection
func (co *ClientConn) Write(m Message) error {
	return co.session.Write(m)
	return co.commander.Write(m)
}

// SetReadDeadline set read deadline for timeout for Exchange
func (co *ClientConn) SetReadDeadline(timeout time.Duration) {
	co.session.SetReadDeadline(timeout)
	co.commander.networkSession.SetReadDeadline(timeout)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (co *ClientConn) SetWriteDeadline(timeout time.Duration) {
	co.session.SetWriteDeadline(timeout)
	co.commander.networkSession.SetWriteDeadline(timeout)
}

// Ping send a ping message and wait for a pong response
func (co *ClientConn) Ping(timeout time.Duration) error {
	return co.session.Ping(timeout)
	return co.commander.Ping(timeout)
}

// Get retrieve the resource identified by the request path


@@ 280,52 279,23 @@ func (co *ClientConn) Get(path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return co.session.Exchange(req)
	return co.commander.Get(path)
}

func (co *ClientConn) putPostHelper(code COAPCode, path string, contentType MediaType, body io.Reader) (Message, error) {
// Post update the resource identified by the request path
func (co *ClientConn) Post(path string, contentType MediaType, body io.Reader) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      POST,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	req.SetOption(ContentFormat, contentType)
	payload, err := ioutil.ReadAll(body)
	if err != nil {
		return nil, err
	}
	req.SetPayload(payload)
	return co.session.Exchange(req)
}

// Post update the resource identified by the request path
func (co *ClientConn) Post(path string, contentType MediaType, body io.Reader) (Message, error) {
	return co.putPostHelper(POST, path, contentType, body)
	return co.commander.Post(path, contentType, body)
}

// Put create the resource identified by the request path
func (co *ClientConn) Put(path string, contentType MediaType, body io.Reader) (Message, error) {
	return co.putPostHelper(PUT, path, contentType, body)
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Put(path, contentType, body)
}

// Delete delete the resource identified by the request path


@@ 333,146 303,25 @@ func (co *ClientConn) Delete(path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      DELETE,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return co.session.Exchange(req)
}

//Observation represents subscription to resource on the server
type Observation struct {
	token     []byte
	path      string
	obsSeqNum uint32
	s         SessionNet
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation) Cancel() error {
	req := o.s.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     o.token,
	})
	req.SetPathString(o.path)
	req.SetOption(Observe, 1)
	err1 := o.s.Write(req)
	err2 := o.s.sessionHandler().remove(o.token)
	if err1 != nil {
		return err1
	}
	return err2
	return co.commander.Delete(path)
}

// Observe subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (co *ClientConn) Observe(path string, observeFunc func(req Message)) (*Observation, error) {
func (co *ClientConn) Observe(path string, observeFunc func(req *Request)) (*Observation, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	req.SetOption(Observe, 0)
	/*
		IoTivity doesn't support Block2 in first request for GET
		block, err := MarshalBlockOption(co.session.blockWiseSzx(), 0, false)
		if err != nil {
			return nil, err
		}
		req.SetOption(Block2, block)
	*/
	o := &Observation{
		token:     token,
		path:      path,
		obsSeqNum: 0,
		s:         co.session,
	}
	err = co.session.sessionHandler().add(token, func(w ResponseWriter, r *Request, next HandlerFunc) {
		needGet := false
		resp := r.Msg
		if r.Msg.Option(Size2) != nil {
			if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
				needGet = true
			}
		}
		if !needGet {
			if block, ok := r.Msg.Option(Block2).(uint32); ok {
				_, _, more, err := UnmarshalBlockOption(block)
				if err != nil {
					return
				}
				needGet = more
			}
		}

		if needGet {
			resp, err = co.Get(path)
			if err != nil {
				return
			}
		}
		setObsSeqNum := func() {
			if r.Msg.Option(Observe) != nil {
				obsSeqNum := r.Msg.Option(Observe).(uint32)
				//obs starts with 0, after that check obsSeqNum
				if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
					return
				}
				o.obsSeqNum = obsSeqNum
			}
		}

		switch {
		case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				setObsSeqNum()
				observeFunc(resp)
			}
		default:
			setObsSeqNum()
			observeFunc(resp)
		}
		return
	})
	if err != nil {
		return nil, err
	}
	err = co.session.Write(req)
	if err != nil {
		co.session.sessionHandler().remove(o.token)
		return nil, err
	}

	return o, nil
	return co.commander.Observe(path, observeFunc)
}

// Close close connection
func (co *ClientConn) Close() {
func (co *ClientConn) Close() error {
	co.srv.Shutdown()
	select {
	case <-co.shutdownSync:
	case <-time.After(co.client.syncTimeout()):
		log.Fatal("Client cannot recv shutdown: Timeout")
	}
	return nil
}

// Dial connects to the address on the named network.

M client_test.go => client_test.go +6 -6
@@ 13,7 13,7 @@ import (
)

func periodicTransmitter(w ResponseWriter, r *Request) {
	msg := r.SessionNet.NewMessage(MessageParams{
	msg := r.Client.NewMessage(MessageParams{
		Type:      Acknowledgement,
		Code:      Content,
		MessageID: r.Msg.MessageID(),


@@ 112,10 112,10 @@ func testServingMCastByClient(t *testing.T, lnet, laddr string, BlockWiseTransfe
		Net: lnet,
		Handler: func(w ResponseWriter, r *Request) {
			if bytes.Equal(r.Msg.Payload(), payload) {
				log.Printf("mcast %v -> %v", r.SessionNet.RemoteAddr(), r.SessionNet.LocalAddr())
				log.Printf("mcast %v -> %v", r.Client.RemoteAddr(), r.Client.LocalAddr())
				ansArrived <- true
			} else {
				t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.SessionNet.RemoteAddr())
				t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.Client.RemoteAddr())
			}
		},
		BlockWiseTransfer:    &BlockWiseTransfer,


@@ 185,7 185,7 @@ func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {

func setupServer(t *testing.T) (string, error) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockSzx1024, func(w ResponseWriter, r *Request) {
		msg := r.SessionNet.NewMessage(MessageParams{
		msg := r.Client.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),


@@ 286,7 286,7 @@ func TestServingUDPDelete(t *testing.T) {

func TestServingUDPObserve(t *testing.T) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockSzx16, func(w ResponseWriter, r *Request) {
		msg := r.SessionNet.NewMessage(MessageParams{
		msg := r.Client.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),


@@ 316,7 316,7 @@ func TestServingUDPObserve(t *testing.T) {
		t.Fatalf("Unexpected error '%v'", err)
	}
	sync := make(chan bool)
	_, err = con.Observe("/tmp/test", func(req Message) {
	_, err = con.Observe("/tmp/test", func(req *Request) {
		sync <- true
	})
	if err != nil {

A clientcommander.go => clientcommander.go +230 -0
@@ 0,0 1,230 @@
package coap

import (
	"bytes"
	"io"
	"io/ioutil"
	"net"
	"time"
)

// ClientCommander provides commands Get,Post,Put,Delete,Observe
// For compare use ClientCommander.Equal
type ClientCommander struct {
	networkSession networkSession
}

// NewMessage Create message for request
func (cc *ClientCommander) NewMessage(p MessageParams) Message {
	return cc.networkSession.NewMessage(p)
}

// LocalAddr implements the networkSession.LocalAddr method.
func (cc *ClientCommander) LocalAddr() net.Addr {
	return cc.networkSession.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (cc *ClientCommander) RemoteAddr() net.Addr {
	return cc.networkSession.RemoteAddr()
}

// Equal compare two ClientCommanders
func (cc *ClientCommander) Equal(cc1 *ClientCommander) bool {
	return cc.RemoteAddr().String() == cc1.RemoteAddr().String() && cc.LocalAddr().String() == cc1.LocalAddr().String()
}

// Exchange performs a synchronous query. It sends the message m to the address
// contained in a and waits for a reply.
//
// Exchange does not retry a failed query, nor will it fall back to TCP in
// case of truncation.
// To specify a local address or a timeout, the caller has to set the `Client.Dialer`
// attribute appropriately
func (cc *ClientCommander) Exchange(m Message) (Message, error) {
	return cc.networkSession.Exchange(m)
}

// Write sends direct a message through the connection
func (cc *ClientCommander) Write(m Message) error {
	return cc.networkSession.Write(m)
}

// Ping send a ping message and wait for a pong response
func (cc *ClientCommander) Ping(timeout time.Duration) error {
	return cc.networkSession.Ping(timeout)
}

// Get retrieve the resource identified by the request path
func (cc *ClientCommander) Get(path string) (Message, error) {
	req, err := createGetReq(cc, path)
	if err != nil {
		return nil, err
	}
	return cc.networkSession.Exchange(req)
}

func (cc *ClientCommander) putPostHelper(code COAPCode, path string, contentType MediaType, body io.Reader) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := cc.networkSession.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      POST,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	req.SetOption(ContentFormat, contentType)
	payload, err := ioutil.ReadAll(body)
	if err != nil {
		return nil, err
	}
	req.SetPayload(payload)
	return cc.networkSession.Exchange(req)
}

// Post update the resource identified by the request path
func (cc *ClientCommander) Post(path string, contentType MediaType, body io.Reader) (Message, error) {
	return cc.putPostHelper(POST, path, contentType, body)
}

// Put create the resource identified by the request path
func (cc *ClientCommander) Put(path string, contentType MediaType, body io.Reader) (Message, error) {
	return cc.putPostHelper(PUT, path, contentType, body)
}

// Delete delete the resource identified by the request path
func (cc *ClientCommander) Delete(path string) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := cc.networkSession.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      DELETE,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return cc.networkSession.Exchange(req)
}

//Observation represents subscription to resource on the server
type Observation struct {
	token     []byte
	path      string
	obsSeqNum uint32
	client    *ClientCommander
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation) Cancel() error {
	req := o.client.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     o.token,
	})
	req.SetPathString(o.path)
	req.SetOption(Observe, 1)
	err1 := o.client.Write(req)
	err2 := o.client.networkSession.TokenHandler().Remove(o.token)
	if err1 != nil {
		return err1
	}
	return err2
}

// Observe subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) (*Observation, error) {
	req, err := createGetReq(cc, path)
	if err != nil {
		return nil, err
	}

	req.SetOption(Observe, 0)
	/*
		IoTivity doesn't support Block2 in first request for GET
		block, err := MarshalBlockOption(cc.networkSession.blockWiseSzx(), 0, false)
		if err != nil {
			return nil, err
		}
		req.SetOption(Block2, block)
	*/
	o := &Observation{
		token:     req.Token(),
		path:      path,
		obsSeqNum: 0,
		client:    cc,
	}
	err = cc.networkSession.TokenHandler().Add(req.Token(), func(w ResponseWriter, r *Request) {
		needGet := false
		resp := r.Msg
		if r.Msg.Option(Size2) != nil {
			if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
				needGet = true
			}
		}
		if !needGet {
			if block, ok := r.Msg.Option(Block2).(uint32); ok {
				_, _, more, err := UnmarshalBlockOption(block)
				if err != nil {
					return
				}
				needGet = more

			}
		}

		if needGet {
			resp, err = cc.Get(path)
			if err != nil {
				return
			}
		}
		setObsSeqNum := func() bool {
			if r.Msg.Option(Observe) != nil {
				obsSeqNum := r.Msg.Option(Observe).(uint32)
				//obs starts with 0, after that check obsSeqNum
				if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
					return false
				}
				o.obsSeqNum = obsSeqNum
			}
			return true
		}

		switch {
		case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				if setObsSeqNum() {
					observeFunc(&Request{Msg: resp, Client: r.Client})
				}
			}
		default:
			if setObsSeqNum() {
				observeFunc(&Request{Msg: resp, Client: r.Client})
			}
		}
		return
	})
	if err != nil {
		return nil, err
	}
	err = cc.networkSession.Write(req)
	if err != nil {
		cc.networkSession.TokenHandler().Remove(o.token)
		return nil, err
	}

	return o, nil
}

// Close close connection
func (cc *ClientCommander) Close() error {
	return cc.networkSession.Close()
}

M examples/mcast/client/main.go => examples/mcast/client/main.go +2 -2
@@ 15,8 15,8 @@ func main() {
	}

	sync := make(chan bool)
	_, err = conn.Publish("/oic/res", func(req coap.Message) {
		log.Printf("Got message: %#v from %v", req, req)
	_, err = conn.Publish("/oic/res", func(req *coap.Request) {
		log.Printf("Got message: %#v from %v", req.Msg, req.Client.RemoteAddr())
		sync <- true
	})
	if err != nil {

M examples/mcast/server/main.go => examples/mcast/server/main.go +2 -2
@@ 7,8 7,8 @@ import (
)

func handleMcast(w coap.ResponseWriter, r *coap.Request) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", r.Msg.Path(), r.Msg, r.SessionNet.RemoteAddr())
	res := r.SessionNet.NewMessage(coap.MessageParams{
	log.Printf("Got message in handleA: path=%q: %#v from %v", r.Msg.Path(), r.Msg, r.Client.RemoteAddr())
	res := r.Client.NewMessage(coap.MessageParams{
		Type:      coap.Acknowledgement,
		Code:      coap.Content,
		MessageID: r.Msg.MessageID(),

M examples/observe/client/main.go => examples/observe/client/main.go +2 -2
@@ 19,8 19,8 @@ func main() {
		log.Fatalf("Error dialing: %v", err)
	}
	num := 0
	obs, err := co.Observe("/some/path", func(req coap.Message) {
		log.Printf("Got %s", req.Payload())
	obs, err := co.Observe("/some/path", func(req *coap.Request) {
		log.Printf("Got %s", req.Msg.Payload())
		num++
		if num >= 10 {
			sync <- true

M examples/observe/server/main.go => examples/observe/server/main.go +2 -2
@@ 9,7 9,7 @@ import (
)

func sendResponse(w coap.ResponseWriter, req *coap.Request, subded time.Time) error {
	resp := req.SessionNet.NewMessage(coap.MessageParams{
	resp := req.Client.NewMessage(coap.MessageParams{
		Type:      coap.Acknowledgement,
		Code:      coap.Content,
		MessageID: req.Msg.MessageID(),


@@ 36,7 36,7 @@ func periodicTransmitter(w coap.ResponseWriter, req *coap.Request) {
func main() {
	log.Fatal(coap.ListenAndServe(":5688", "udp",
		coap.HandlerFunc(func(w coap.ResponseWriter, req *coap.Request) {
			log.Printf("Got message path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
			log.Printf("Got message path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
			switch {
			case req.Msg.Code() == coap.GET && req.Msg.Option(coap.Observe) != nil && req.Msg.Option(coap.Observe).(uint32) == 0:
				go periodicTransmitter(w, req)

M examples/simple/server/main.go => examples/simple/server/main.go +4 -4
@@ 7,9 7,9 @@ import (
)

func handleA(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
	if req.Msg.IsConfirmable() {
		res := req.SessionNet.NewMessage(coap.MessageParams{
		res := req.Client.NewMessage(coap.MessageParams{
			Type:      coap.Acknowledgement,
			Code:      coap.Content,
			MessageID: req.Msg.MessageID(),


@@ 24,9 24,9 @@ func handleA(w coap.ResponseWriter, req *coap.Request) {
}

func handleB(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
	if req.Msg.IsConfirmable() {
		res := req.SessionNet.NewMessage(coap.MessageParams{
		res := req.Client.NewMessage(coap.MessageParams{
			Type:      coap.Acknowledgement,
			Code:      coap.Content,
			MessageID: req.Msg.MessageID(),

M iotivity_test.go => iotivity_test.go +5 -7
@@ 1,6 1,5 @@
package coap

/*
import (
	"bytes"
	"fmt"


@@ 48,7 47,7 @@ func decodeMsg(resp Message) {
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("OBSERVE : %v\n", req.SessionNet.RemoteAddr())
	fmt.Printf("OBSERVE : %v\n", req.Client.RemoteAddr())
	decodeMsg(req.Msg)
}



@@ 108,8 107,8 @@ func TestBlockWiseObserveBlock16(t *testing.T) {
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	_, err = co.Observe(path, func(req Message) {
		decodeMsg(req)
	_, err = co.Observe(path, func(req *Request) {
		decodeMsg(req.Msg)
		sync <- true
	})
	if err != nil {


@@ 128,8 127,8 @@ func TestBlockWiseMulticastBlock16(t *testing.T) {
		t.Fatalf("Error dialing: %v", err)
	}
	sync := make(chan bool)
	_, err = co.Publish("/oic/res", func(req Message) {
		decodeMsg(req)
	_, err = co.Publish("/oic/res", func(req *Request) {
		decodeMsg(req.Msg)
		sync <- true
	})
	if err != nil {


@@ 153,4 152,3 @@ func TestGetBlock16(t *testing.T) {
	}
	decodeMsg(resp)
}
*/

M multicastClient.go => multicastClient.go +11 -15
@@ 28,13 28,13 @@ type MulticastClient struct {
	BlockWiseTransfer    *bool     // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
	BlockWiseTransferSzx *BlockSzx // Set maximal block size of payload that will be send in fragment

	multicastHandler *sessionHandler
	multicastHandler *TokenHandler
}

// Dial connects to the address on the named network.
func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
	if c.multicastHandler == nil {
		c.multicastHandler = &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))}
		c.multicastHandler = &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)}
	}
	client := &Client{
		Net:            net,


@@ 48,7 48,7 @@ func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
			if handler == nil {
				handler = HandleFailed
			}
			c.multicastHandler.handle(w, r, handler)
			c.multicastHandler.Handle(w, r, handler)
		},
		NotifySessionEndFunc: c.NotifySessionEndFunc,
		BlockWiseTransfer:    c.BlockWiseTransfer,


@@ 79,12 79,12 @@ func (c *MulticastClient) Dial(address string) (*MulticastClientConn, error) {
	}, nil
}

// LocalAddr implements the SessionNet.LocalAddr method.
// LocalAddr implements the networkSession.LocalAddr method.
func (mconn *MulticastClientConn) LocalAddr() net.Addr {
	return mconn.conn.LocalAddr()
}

// RemoteAddr implements the SessionNet.RemoteAddr method.
// RemoteAddr implements the networkSession.RemoteAddr method.
func (mconn *MulticastClientConn) RemoteAddr() net.Addr {
	return mconn.conn.RemoteAddr()
}


@@ 122,7 122,7 @@ type ResponseWaiter struct {

// Cancel remove observation from server. For recreate observation use Observe.
func (r *ResponseWaiter) Cancel() error {
	return r.conn.client.multicastHandler.remove(r.token)
	return r.conn.client.multicastHandler.Remove(r.token)
}

type messageNewer interface {


@@ 146,7 146,7 @@ func createGetReq(m messageNewer, path string) (Message, error) {

// Publish subscribe to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) Publish(path string, responseHandler func(resp Message)) (*ResponseWaiter, error) {
func (mconn *MulticastClientConn) Publish(path string, responseHandler func(req *Request)) (*ResponseWaiter, error) {
	req, err := createGetReq(mconn, path)
	if err != nil {
		return nil, err


@@ 156,7 156,7 @@ func (mconn *MulticastClientConn) Publish(path string, responseHandler func(resp
		path:  path,
		conn:  mconn,
	}
	err = mconn.client.multicastHandler.add(req.Token(), func(w ResponseWriter, r *Request, next HandlerFunc) {
	err = mconn.client.multicastHandler.Add(req.Token(), func(w ResponseWriter, r *Request) {
		needGet := false
		resp := r.Msg
		if r.Msg.Option(Size2) != nil {


@@ 175,16 175,12 @@ func (mconn *MulticastClientConn) Publish(path string, responseHandler func(resp
		}

		if needGet {
			getReq, err := createGetReq(mconn, path)
			if err != nil {
				return
			}
			resp, err = r.SessionNet.Exchange(getReq)
			resp, err = r.Client.Get(path)
			if err != nil {
				return
			}
		}
		responseHandler(resp)
		responseHandler(&Request{Msg: resp, Client: r.Client})
	})
	if err != nil {
		return nil, err


@@ 192,7 188,7 @@ func (mconn *MulticastClientConn) Publish(path string, responseHandler func(resp

	err = mconn.Write(req)
	if err != nil {
		mconn.client.multicastHandler.remove(r.token)
		mconn.client.multicastHandler.Remove(r.token)
		return nil, err
	}


M request.go => request.go +2 -2
@@ 1,6 1,6 @@
package coap

type Request struct {
	Msg        Message
	SessionNet SessionNet
	Msg    Message
	Client *ClientCommander
}

M responsewriter.go => responsewriter.go +1 -1
@@ 14,5 14,5 @@ func (r *responseWriter) Write(msg Message) error {
	case GET, POST, PUT, DELETE:
		return ErrInvalidReponseCode
	}
	return r.req.SessionNet.Write(msg)
	return r.req.Client.Write(msg)
}

M server.go => server.go +23 -23
@@ 49,7 49,7 @@ func (f HandlerFunc) ServeCOAP(w ResponseWriter, r *Request) {

// HandleFailed returns a HandlerFunc that returns NotFound for every request it gets.
func HandleFailed(w ResponseWriter, req *Request) {
	msg := req.SessionNet.NewMessage(MessageParams{
	msg := req.Client.NewMessage(MessageParams{
		Type:      Acknowledgement,
		Code:      NotFound,
		MessageID: req.Msg.MessageID(),


@@ 124,13 124,13 @@ type Server struct {
	// The maximum of time for synchronization go-routines, defaults to 30 seconds, if it occurs, then it call log.Fatal
	SyncTimeout time.Duration
	// If createSessionUDPFunc is set it is called when session UDP want to be created
	createSessionUDPFunc func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error)
	createSessionUDPFunc func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error)
	// If createSessionUDPFunc is set it is called when session TCP want to be created
	createSessionTCPFunc func(connection Conn, srv *Server) (SessionNet, error)
	createSessionTCPFunc func(connection Conn, srv *Server) (networkSession, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w SessionNet)
	NotifySessionNewFunc func(w *ClientCommander)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.
	NotifySessionEndFunc func(w SessionNet, err error)
	NotifySessionEndFunc func(w *ClientCommander, err error)
	// The interfaces that will be used for udp-mcast (default uses the system assigned for multicast)
	UDPMcastInterfaces []net.Interface
	// Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)


@@ 152,7 152,7 @@ type Server struct {
	started bool

	sessionUDPMapLock sync.Mutex
	sessionUDPMap     map[string]SessionNet
	sessionUDPMap     map[string]networkSession
}

func (srv *Server) workerChannelHandler(inUse bool, timeout *time.Timer) bool {


@@ 307,43 307,43 @@ func (srv *Server) ActivateAndServe() error {
	pConn := srv.Conn
	l := srv.Listener

	srv.sessionUDPMap = make(map[string]SessionNet)
	srv.sessionUDPMap = make(map[string]networkSession)

	srv.queue = make(chan *Request)
	defer close(srv.queue)

	if srv.createSessionTCPFunc == nil {
		srv.createSessionTCPFunc = func(connection Conn, srv *Server) (SessionNet, error) {
		srv.createSessionTCPFunc = func(connection Conn, srv *Server) (networkSession, error) {
			session, err := newSessionTCP(connection, srv)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
				return &blockWiseSession{networkSession: session}, nil
			}
			return session, nil
		}
	}

	if srv.createSessionUDPFunc == nil {
		srv.createSessionUDPFunc = func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
		srv.createSessionUDPFunc = func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
				return &blockWiseSession{networkSession: session}, nil
			}
			return session, nil
		}
	}

	if srv.NotifySessionNewFunc == nil {
		srv.NotifySessionNewFunc = func(w SessionNet) {}
		srv.NotifySessionNewFunc = func(w *ClientCommander) {}
	}

	if srv.NotifySessionEndFunc == nil {
		srv.NotifySessionEndFunc = func(w SessionNet, err error) {}
		srv.NotifySessionEndFunc = func(w *ClientCommander, err error) {}
	}

	if pConn != nil {


@@ 414,7 414,7 @@ func (srv *Server) serveTCPconnection(conn net.Conn) error {
	if err != nil {
		return err
	}
	srv.NotifySessionNewFunc(session)
	srv.NotifySessionNewFunc(&ClientCommander{session})
	br := srv.acquireReader(conn)
	defer srv.releaseReader(br)
	for {


@@ 439,7 439,7 @@ func (srv *Server) serveTCPconnection(conn net.Conn) error {

		// We will block poller wait loop when
		// all pool workers are busy.
		srv.spawnWorker(&Request{SessionNet: session, Msg: msg})
		srv.spawnWorker(&Request{Client: &ClientCommander{session}, Msg: msg})
	}
}



@@ 498,10 498,10 @@ func (srv *Server) serveTCP(l net.Listener) error {
func (srv *Server) closeSessions(err error) {
	srv.sessionUDPMapLock.Lock()
	tmp := srv.sessionUDPMap
	srv.sessionUDPMap = make(map[string]SessionNet)
	srv.sessionUDPMap = make(map[string]networkSession)
	srv.sessionUDPMapLock.Unlock()
	for _, v := range tmp {
		srv.NotifySessionEndFunc(v, err)
		srv.NotifySessionEndFunc(&ClientCommander{v}, err)
	}
}



@@ 550,7 550,7 @@ func (srv *Server) serveUDP(conn *net.UDPConn) error {
			if err != nil {
				return err
			}
			srv.NotifySessionNewFunc(session)
			srv.NotifySessionNewFunc(&ClientCommander{session})
			srv.sessionUDPMap[s.Key()] = session
			srv.sessionUDPMapLock.Unlock()
		} else {


@@ 561,7 561,7 @@ func (srv *Server) serveUDP(conn *net.UDPConn) error {
		if err != nil {
			continue
		}
		srv.spawnWorker(&Request{Msg: msg, SessionNet: session})
		srv.spawnWorker(&Request{Msg: msg, Client: &ClientCommander{session}})
	}
}



@@ 571,20 571,20 @@ func (srv *Server) serve(r *Request) {
	case r.Msg.Code() == GET:
		switch {
		// set blockwise notice writer for observe
		case r.SessionNet.blockWiseEnabled() && r.Msg.Option(Observe) != nil:
		case r.Client.networkSession.blockWiseEnabled() && r.Msg.Option(Observe) != nil:
			w = &blockWiseNoticeWriter{responseWriter: w.(*responseWriter)}
		// set blockwise if it is enabled
		case r.SessionNet.blockWiseEnabled():
		case r.Client.networkSession.blockWiseEnabled():
			w = &blockWiseResponseWriter{responseWriter: w.(*responseWriter)}
		}
		w = &getResponseWriter{w}
	case r.SessionNet.blockWiseEnabled():
	case r.Client.networkSession.blockWiseEnabled():
		w = &blockWiseResponseWriter{responseWriter: w.(*responseWriter)}
	}

	handlePairMsg(w, r, func(w ResponseWriter, r *Request) {
		handleSignalMsg(w, r, func(w ResponseWriter, r *Request) {
			handleBySessionHandler(w, r, func(w ResponseWriter, r *Request) {
			handleBySessionTokenHandler(w, r, func(w ResponseWriter, r *Request) {
				handleBlockWiseMsg(w, r, srv.serveCOAP)
			})
		})

M server_test.go => server_test.go +26 -26
@@ 44,7 44,7 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {

func EchoServer(w ResponseWriter, r *Request) {
	if r.Msg.IsConfirmable() {
		err := w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
		err := w.Write(CreateRespMessageByReq(r.Client.networkSession.IsTCP(), Valid, r.Msg))
		if err != nil {
			log.Printf("Cannot write echo %v", err)
		}


@@ 53,7 53,7 @@ func EchoServer(w ResponseWriter, r *Request) {

func EchoServerBadID(w ResponseWriter, r *Request) {
	if r.Msg.IsConfirmable() {
		w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), BadRequest, r.Msg))
		w.Write(CreateRespMessageByReq(r.Client.networkSession.IsTCP(), BadRequest, r.Msg))
	}
}



@@ 74,11 74,11 @@ func RunLocalServerUDPWithHandler(lnet, laddr string, BlockWiseTransfer bool, Bl
		return nil, "", nil, err
	}
	server := &Server{Conn: pc, ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		NotifySessionNewFunc: func(s *ClientCommander) {
			fmt.Printf("networkSession start %v\n", s.RemoteAddr())
		},
		NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		NotifySessionEndFunc: func(w *ClientCommander, err error) {
			fmt.Printf("networkSession end %v: %v\n", w.RemoteAddr(), err)
		},
		Handler:              handler,
		BlockWiseTransfer:    &BlockWiseTransfer,


@@ 114,10 114,10 @@ func RunLocalServerTCPWithHandler(laddr string, BlockWiseTransfer bool, BlockWis
	}

	server := &Server{Listener: l, ReadTimeout: time.Second * 3600, WriteTimeout: time.Second * 3600,
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		NotifySessionNewFunc: func(s *ClientCommander) {
			fmt.Printf("networkSession start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w *ClientCommander, err error) {
			fmt.Printf("networkSession end %v: %v\n", w.RemoteAddr(), err)
		}, Handler: handler,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,


@@ 151,10 151,10 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 
	}

	server := &Server{Listener: l, ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		NotifySessionNewFunc: func(s *ClientCommander) {
			fmt.Printf("networkSession start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w *ClientCommander, err error) {
			fmt.Printf("networkSession end %v: %v\n", w.RemoteAddr(), err)
		}}

	waitLock := sync.Mutex{}


@@ 240,7 240,7 @@ func simpleMsg(t *testing.T, payload []byte, co *ClientConn) {
	req.SetOption(ContentFormat, TextPlain)
	req.SetPathString("/test")

	res := CreateRespMessageByReq(co.session.IsTCP(), Valid, req)
	res := CreateRespMessageByReq(co.commander.networkSession.IsTCP(), Valid, req)

	m, err := co.Exchange(req)
	if err != nil {


@@ 297,23 297,23 @@ func TestServingTLSBigMsg(t *testing.T) {
}

func ChallegingServer(w ResponseWriter, r *Request) {
	req := r.SessionNet.NewMessage(MessageParams{
	req := r.Client.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: 12345,
		Payload:   []byte("hello, world!"),
		Token:     []byte("abcd"),
	})
	_, err := r.SessionNet.Exchange(req)
	_, err := r.Client.Exchange(req)
	if err != nil {
		panic(err.Error())
	}

	w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
	w.Write(CreateRespMessageByReq(r.Client.networkSession.IsTCP(), Valid, r.Msg))
}

func ChallegingServerTimeout(w ResponseWriter, r *Request) {
	req := r.SessionNet.NewMessage(MessageParams{
	req := r.Client.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: 12345,


@@ 321,12 321,12 @@ func ChallegingServerTimeout(w ResponseWriter, r *Request) {
		Token:     []byte("abcd"),
	})
	req.SetOption(ContentFormat, TextPlain)
	_, err := r.SessionNet.exchangeTimeout(req, time.Second, time.Second)
	_, err := r.Client.networkSession.exchangeTimeout(req, time.Second, time.Second)
	if err == nil {
		panic("Error: expected timeout")
	}

	w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
	w.Write(CreateRespMessageByReq(r.Client.networkSession.IsTCP(), Valid, r.Msg))
}

func simpleChallengingMsg(t *testing.T, payload []byte, co *ClientConn) {


@@ 354,7 354,7 @@ func simpleChallengingPathMsg(t *testing.T, payload []byte, co *ClientConn, path
		t.Fatalf("unable to read msg from server: %v", err)
	}

	res := CreateRespMessageByReq(co.session.IsTCP(), Valid, req0)
	res := CreateRespMessageByReq(co.commander.networkSession.IsTCP(), Valid, req0)
	assertEqualMessages(t, res, resp0)
}



@@ 414,7 414,7 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		Handler: func(w ResponseWriter, r *Request) {
			resp := r.SessionNet.NewMessage(MessageParams{
			resp := r.Client.NewMessage(MessageParams{
				Type:      Acknowledgement,
				Code:      Content,
				MessageID: r.Msg.MessageID(),


@@ 429,14 429,14 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
	}

	s, _, fin, err := RunLocalServerUDPWithHandler(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx, func(w ResponseWriter, r *Request) {
		resp := r.SessionNet.NewMessage(MessageParams{
		resp := r.Client.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),
			Payload:   make([]byte, payloadLen),
			Token:     r.Msg.Token(),
		})
		conn, err := responseServer.Dial(r.SessionNet.RemoteAddr().String())
		conn, err := responseServer.Dial(r.Client.RemoteAddr().String())
		if err != nil {
			t.Fatalf("cannot create connection %v", err)
		}


@@ 473,7 473,7 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
	}
	defer co.Close()

	rp, err := co.Publish("/test", func(Msg Message) {
	rp, err := co.Publish("/test", func(req *Request) {
		ansArrived <- true
	})
	if err != nil {

M sessionnet.go => sessionnet.go +26 -75
@@ 8,9 8,9 @@ import (
	"time"
)

// A SessionNet interface is used by an COAP handler to
// A networkSession interface is used by an COAP handler to
// server data in session.
type SessionNet interface {
type networkSession interface {
	// LocalAddr returns the net.Addr of the server
	LocalAddr() net.Addr
	// RemoteAddr returns the net.Addr of the client that sent the current request.


@@ 51,7 51,7 @@ type SessionNet interface {

	exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error)

	sessionHandler() *sessionHandler
	TokenHandler() *TokenHandler

	// BlockWiseTransferEnabled
	blockWiseEnabled() bool


@@ 64,7 64,7 @@ type SessionNet interface {
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024


@@ 85,7 85,7 @@ func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData)
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))},
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: BlockWiseTransferSzx,
		},


@@ 96,7 96,7 @@ func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData)
}

// newSessionTCP create new session for TCP connection
func newSessionTCP(connection Conn, srv *Server) (SessionNet, error) {
func newSessionTCP(connection Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockSzxBERT
	if srv.BlockWiseTransfer != nil {


@@ 113,7 113,7 @@ func newSessionTCP(connection Conn, srv *Server) (SessionNet, error) {
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))},
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: BlockWiseTransferSzx,
		},


@@ 136,7 136,7 @@ type sessionBase struct {
	connection    Conn
	readDeadline  time.Duration
	writeDeadline time.Duration
	handler       *sessionHandler
	handler       *TokenHandler

	blockWiseTransfer    bool
	blockWiseTransferSzx BlockSzx


@@ 159,22 159,22 @@ type sessionTCP struct {
	peerMaxMessageSize    uint32
}

// LocalAddr implements the SessionNet.LocalAddr method.
// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionUDP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// LocalAddr implements the SessionNet.LocalAddr method.
// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionTCP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the SessionNet.RemoteAddr method.
// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionUDP) RemoteAddr() net.Addr {
	return s.sessionUDPData.RemoteAddr()
}

// RemoteAddr implements the SessionNet.RemoteAddr method.
// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionTCP) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}


@@ 235,7 235,7 @@ func (s *sessionTCP) blockWiseIsValid(szx BlockSzx) bool {
	return true
}

func (s *sessionBase) sessionHandler() *sessionHandler {
func (s *sessionBase) TokenHandler() *TokenHandler {
	return s.handler
}



@@ 244,7 244,7 @@ func (s *sessionUDP) closeWithError(err error) error {
	delete(s.srv.sessionUDPMap, s.sessionUDPData.Key())
	s.srv.sessionUDPMapLock.Unlock()

	s.srv.NotifySessionEndFunc(s, err)
	s.srv.NotifySessionEndFunc(&ClientCommander{s}, err)

	return err
}


@@ 291,14 291,14 @@ func (s *sessionTCP) Ping(timeout time.Duration) error {
	return ErrInvalidResponse
}

// Close implements the SessionNet.Close method
// Close implements the networkSession.Close method
func (s *sessionUDP) Close() error {
	return s.closeWithError(nil)
}

func (s *sessionTCP) closeWithError(err error) error {
	if s.connection != nil {
		s.srv.NotifySessionEndFunc(s, err)
		s.srv.NotifySessionEndFunc(&ClientCommander{s}, err)
		e := s.connection.Close()
		//s.connection = nil
		if e == nil {


@@ 309,7 309,7 @@ func (s *sessionTCP) closeWithError(err error) error {
	return err
}

// Close implements the SessionNet.Close method
// Close implements the networkSession.Close method
func (s *sessionTCP) Close() error {
	return s.closeWithError(nil)
}


@@ 324,12 324,12 @@ func (s *sessionTCP) NewMessage(p MessageParams) Message {
	return NewTcpMessage(p)
}

// Close implements the SessionNet.Close method
// Close implements the networkSession.Close method
func (s *sessionUDP) IsTCP() bool {
	return false
}

// Close implements the SessionNet.Close method
// Close implements the networkSession.Close method
func (s *sessionTCP) IsTCP() bool {
	return true
}


@@ 349,12 349,12 @@ func (s *sessionBase) exchangeFunc(req Message, writeTimeout, readTimeout time.D
	}
}

// Write implements the SessionNet.Write method.
// Write implements the networkSession.Write method.
func (s *sessionTCP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}

// Write implements the SessionNet.Write method.
// Write implements the networkSession.Write method.
func (s *sessionUDP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}


@@ 415,7 415,7 @@ func (s *sessionUDP) exchangeTimeout(req Message, writeDeadline, readDeadline ti
	return s.exchangeFunc(req, writeDeadline, readDeadline, pairChan, s.writeTimeout)
}

// Write implements the SessionNet.Write method.
// Write implements the networkSession.Write method.
func (s *sessionTCP) Write(m Message) error {
	return s.writeTimeout(m, s.writeDeadline)
}


@@ 437,7 437,7 @@ func (s *sessionTCP) writeTimeout(m Message, timeout time.Duration) error {
	return s.connection.write(&writeReqTCP{writeReqBase{req: req, respChan: make(chan error, 1)}}, timeout)
}

// WriteMsg implements the SessionNet.WriteMsg method.
// WriteMsg implements the networkSession.WriteMsg method.
func (s *sessionUDP) writeTimeout(m Message, timeout time.Duration) error {
	req, err := m.MarshalBinary()
	if err != nil {


@@ 518,7 518,7 @@ func (s *sessionTCP) setPeerBlockWiseTransfer(val bool) {
}

func (s *sessionUDP) sendPong(w ResponseWriter, r *Request) error {
	resp := r.SessionNet.NewMessage(MessageParams{
	resp := r.Client.NewMessage(MessageParams{
		Type:      Reset,
		Code:      Empty,
		MessageID: r.Msg.MessageID(),


@@ 599,62 599,13 @@ func (s *sessionUDP) handleSignals(w ResponseWriter, r *Request) bool {
}

func handleSignalMsg(w ResponseWriter, r *Request, next HandlerFunc) {
	if !r.SessionNet.handleSignals(w, r) {
	if !r.Client.networkSession.handleSignals(w, r) {
		next(w, r)
	}
}

func handlePairMsg(w ResponseWriter, r *Request, next HandlerFunc) {
	if !r.SessionNet.handlePairMsg(w, r) {
	if !r.Client.networkSession.handlePairMsg(w, r) {
		next(w, r)
	}
}

func handleBySessionHandler(w ResponseWriter, r *Request, next HandlerFunc) {
	r.SessionNet.sessionHandler().handle(w, r, next)
}

type sessionHandler struct {
	tokenHandlers     map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc)
	tokenHandlersLock sync.Mutex
}

func (s *sessionHandler) handle(w ResponseWriter, r *Request, next HandlerFunc) {
	//validate token
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	s.tokenHandlersLock.Lock()
	h := s.tokenHandlers[token]
	s.tokenHandlersLock.Unlock()
	if h != nil {
		h(w, r, next)
		return
	}
	if next != nil {
		next(w, r)
	}
}

func (s *sessionHandler) add(t []byte, h func(w ResponseWriter, r *Request, next HandlerFunc)) error {
	var token [MaxTokenSize]byte
	copy(token[:], t)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[token] != nil {
		return ErrTokenAlreadyExist
	}
	s.tokenHandlers[token] = h
	return nil
}

func (s *sessionHandler) remove(t []byte) error {
	var token [MaxTokenSize]byte
	copy(token[:], t)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[token] == nil {
		return ErrTokenNotExist
	}
	delete(s.tokenHandlers, token)
	return nil
}

A tokenhandler.go => tokenhandler.go +56 -0
@@ 0,0 1,56 @@
package coap

import "sync"

func handleBySessionTokenHandler(w ResponseWriter, r *Request, next HandlerFunc) {
	r.Client.networkSession.TokenHandler().Handle(w, r, next)
}

//TokenHandler container for regirstration handlers by token
type TokenHandler struct {
	tokenHandlers     map[[MaxTokenSize]byte]HandlerFunc
	tokenHandlersLock sync.Mutex
}

//Handle call handler for request if exist otherwise use next
func (s *TokenHandler) Handle(w ResponseWriter, r *Request, next HandlerFunc) {
	//validate token
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	s.tokenHandlersLock.Lock()
	h := s.tokenHandlers[token]
	s.tokenHandlersLock.Unlock()
	if h != nil {
		h(w, r)
		return
	}
	if next != nil {
		next(w, r)
	}
}

//Add register handler for token
func (s *TokenHandler) Add(token []byte, handler func(w ResponseWriter, r *Request)) error {
	var t [MaxTokenSize]byte
	copy(t[:], token)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[t] != nil {
		return ErrTokenAlreadyExist
	}
	s.tokenHandlers[t] = handler
	return nil
}

//Remove unregister handler for token
func (s *TokenHandler) Remove(token []byte) error {
	var t [MaxTokenSize]byte
	copy(t[:], token)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[t] == nil {
		return ErrTokenNotExist
	}
	delete(s.tokenHandlers, t)
	return nil
}