~fnux/yggdrasil-go-coap

6cb733c1b04ba02c412af95088c8aefcfe696272 — Jozef Kralik 1 year, 11 months ago 6251d12
integrate dtls (#53)


M Gopkg.lock => Gopkg.lock +36 -7
@@ 10,6 10,26 @@
  version = "v1.1.1"

[[projects]]
  digest = "1:00ba9fb5c14095829a3c738c8998b2e3d6019002291e41443ecb775f191e8ba2"
  name = "github.com/pion/dtls"
  packages = [
    ".",
    "internal/crypto/ccm",
    "internal/udp",
  ]
  pruneopts = "UT"
  revision = "bcc3927a53a8f925a2d30ee83cb319270a7bd667"
  version = "v1.4.0"

[[projects]]
  digest = "1:92d9305a35d7130172f46e095505cc56af49d073907ae52088c15e8c8ae3f519"
  name = "github.com/pion/logging"
  packages = ["."]
  pruneopts = "UT"
  revision = "0387f8acdeb20faf48e539e74906dd633851f3a8"
  version = "v0.2.2"

[[projects]]
  digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe"
  name = "github.com/pmezard/go-difflib"
  packages = ["difflib"]


@@ 29,16 49,24 @@
  version = "v1.3.0"

[[projects]]
  digest = "1:d0072748c62defde1ad99dde77f6ffce492a0e5aea9204077e497c7edfb86653"
  digest = "1:5a1cf4e370bc86137b58da2ae065e76526d32b11f62a7665f36dbd5f41fa95ff"
  name = "github.com/ugorji/go"
  packages = ["codec"]
  pruneopts = "UT"
  revision = "2adff0894ba3bc2eeb9f9aea45fefd49802e1a13"
  version = "v1.1.4"
  revision = "23ab95ef5dc3b70286760af84ce2327a2b64ed62"
  version = "v1.1.7"

[[projects]]
  branch = "master"
  digest = "1:c79dab329e903f5f7793c3f541645f5e4fdaeb222ccd804b0913ea796d0f0213"
  name = "golang.org/x/crypto"
  packages = ["curve25519"]
  pruneopts = "UT"
  revision = "4def268fd1a49955bfb3dda92fe3db4f924f2285"

[[projects]]
  branch = "master"
  digest = "1:34d3139bb084e2883e9823772afb88a3e9fd5812bade1e5ba455cbdffb15732a"
  digest = "1:9896e6185a5f9a8c93b4c641c96bfab46677abf2be0a332a2affde2f575c0aad"
  name = "golang.org/x/net"
  packages = [
    "bpf",


@@ 48,23 76,24 @@
    "ipv6",
  ]
  pruneopts = "UT"
  revision = "d28f0bde5980168871434b95cfc858db9f2a7a99"
  revision = "da137c7871d730100384dbcf36e6f8fa493aef5b"

[[projects]]
  branch = "master"
  digest = "1:3398992b8f4b00ce4a6a5f9770008cf66dcff8f128b51eeab8c1f657096cac8f"
  digest = "1:31d44814b45607afa77c6e8ef20ea8bacc1d1053fe084440c874f311732e7ec3"
  name = "golang.org/x/sys"
  packages = [
    "unix",
    "windows",
  ]
  pruneopts = "UT"
  revision = "15dcb6c0061f497a3f66e3ea034b629c6dd4d99e"
  revision = "fae7ac547cb717d141c433a2a173315e216b64c4"

[solve-meta]
  analyzer-name = "dep"
  analyzer-version = 1
  input-imports = [
    "github.com/pion/dtls",
    "github.com/stretchr/testify/assert",
    "github.com/stretchr/testify/require",
    "github.com/ugorji/go/codec",

M Gopkg.toml => Gopkg.toml +5 -1
@@ 26,12 26,16 @@


[[constraint]]
  name = "github.com/pion/dtls"
  version = "1.4.0"

[[constraint]]
  name = "github.com/stretchr/testify"
  version = "1.3.0"

[[constraint]]
  name = "github.com/ugorji/go"
  version = "1.1.4"
  version = "1.1.7"

[[constraint]]
  branch = "master"

M README.md => README.md +12 -7
@@ 12,15 12,14 @@ Features supported:
* request multiplexer
* multicast
* CoAP NoResponse option in CoAP [RFC 7967][coap-noresponse]

Not yet implemented:
* CoAP over DTLS
* CoAP over DTLS [pion/dtls][pion-dtls]

[coap]: http://tools.ietf.org/html/rfc7252
[coap-tcp]: https://tools.ietf.org/html/rfc8323
[coap-block-wise-transfers]: https://tools.ietf.org/html/rfc7959
[coap-observe]: https://tools.ietf.org/html/rfc7641
[coap-noresponse]: https://tools.ietf.org/html/rfc7967
[pion-dtls]: https://github.com/pion/dtls

## Samples



@@ 43,13 42,16 @@ Not yet implemented:
		mux := coap.NewServeMux()
		mux.Handle("/a", coap.HandlerFunc(handleA))

		log.Fatal(coap.ListenAndServe(":5688", "udp", mux))
		log.Fatal(coap.ListenAndServe("udp", ":5688", mux))
		
		// for tcp
		// log.Fatal(coap.ListenAndServe(":5688", "tcp", mux))
		// log.Fatal(coap.ListenAndServe("tcp", ":5688",  mux))

		// fot tcp-tls
		// log.Fatal(coap.ListenAndServeTLS(":5688", CertPEMBlock, KeyPEMBlock, mux))
		// log.Fatal(coap.ListenAndServeTLS("tcp-tls", ":5688", &tls.Config{...}, mux))

		// fot udp-dtls
		// log.Fatal(coap.ListenAndServeDTLS("udp-dtls", ":5688", &dtls.Config{...}, mux))
	}
```
#### Client


@@ 63,7 65,10 @@ Not yet implemented:
		// co, err := coap.Dial("tcp", "localhost:5688")
		
		// for tcp-tls
		// co, err := coap.DialWithTLS("localhost:5688", &tls.Config{InsecureSkipVerify: true})
		// co, err := coap.DialTLS("tcp-tls", localhost:5688", &tls.Config{...})

		// fot udp-dtls
		// co, err := coap.DialDTLS("udp-dtls", "localhost:5688", &dtls.Config{...}, mux))

		if err != nil {
			log.Fatalf("Error dialing: %v", err)

M blockwise_test.go => blockwise_test.go +18 -18
@@ 230,13 230,13 @@ func TestServingUDPBlockWiseUsingWrite(t *testing.T) {
	}

	expectedMsg := &DgramMessage{
		MessageBase{
			typ:       Acknowledgement,
			code:      Content,
			messageID: req.MessageID(),
			payload:   req.Payload(),
			token:     req.Token(),
		MessageBase: MessageBase{
			typ:     Acknowledgement,
			code:    Content,
			payload: req.Payload(),
			token:   req.Token(),
		},
		messageID: req.MessageID(),
	}
	expectedMsg.SetOption(ContentFormat, req.Option(ContentFormat))



@@ 281,13 281,13 @@ func TestServingUDPBlockWiseWithClientWithoutBlockWise(t *testing.T) {
	}

	expectedMsg := &DgramMessage{
		MessageBase{
			typ:       Acknowledgement,
			code:      Content,
			messageID: req.MessageID(),
			payload:   req.Payload(),
			token:     req.Token(),
		MessageBase: MessageBase{
			typ:     Acknowledgement,
			code:    Content,
			payload: req.Payload(),
			token:   req.Token(),
		},
		messageID: req.MessageID(),
	}

	expectedMsg.SetOption(ContentFormat, TextPlain)


@@ 306,13 306,13 @@ func TestServingUDPBlockWiseWithClientWithoutBlockWise(t *testing.T) {
		t.Fatal("failed to exchange", err)
	}
	expectedGetMsg := DgramMessage{
		MessageBase{
			typ:       Acknowledgement,
			code:      Content,
			messageID: getReq.MessageID(),
			payload:   helloWorld,
			token:     getReq.Token(),
		MessageBase: MessageBase{
			typ:     Acknowledgement,
			code:    Content,
			payload: helloWorld,
			token:   getReq.Token(),
		},
		messageID: getReq.MessageID(),
	}

	if etag, ok := getResp.Option(ETag).([]byte); ok {

M client.go => client.go +68 -4
@@ 12,6 12,7 @@ import (
	"time"

	coapNet "github.com/go-ocf/go-coap/net"
	"github.com/pion/dtls"
)

// A ClientConn represents a connection to a COAP server.


@@ 28,6 29,7 @@ type Client struct {
	Net            string        // if "tcp" or "tcp-tls" (COAP over TLS) a TCP query will be initiated, otherwise an UDP one (default is "" for UDP) or "udp-mcast" for multicast
	MaxMessageSize uint32        // Max message size that could be received from peer. If not set it defaults to 1152 B.
	TLSConfig      *tls.Config   // TLS connection configuration
	DTLSConfig     *dtls.Config  // TLS connection configuration
	DialTimeout    time.Duration // set Timeout for dialer
	ReadTimeout    time.Duration // net.ClientConn.SetReadTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	WriteTimeout   time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero


@@ 117,6 119,18 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		}
		sessionUPDData = coapNet.NewConnUDPContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		BlockWiseTransfer = true
	case "udp-dtls", "udp4-dtls", "udp6-dtls":
		network = c.Net
		Net := strings.TrimSuffix(c.Net, "-dtls")
		addr, err := net.ResolveUDPAddr(Net, address)
		if err != nil {
			return nil, fmt.Errorf("cannot resolve udp address: %v", err)
		}
		if conn, err = dtls.Dial(Net, addr, c.DTLSConfig); err != nil {
			return nil, err
		}
		conn = coapNet.NewConnDTLS(conn)
		BlockWiseTransfer = true
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		var err error
		network = strings.TrimSuffix(c.Net, "-mcast")


@@ 151,6 165,8 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		BlockWiseTransferSzx = *c.BlockWiseTransferSzx
	}

	started := make(chan struct{})

	//sync := make(chan bool)
	clientConn = &ClientConn{
		srv: &Server{


@@ 163,6 179,9 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
			BlockWiseTransfer:        &BlockWiseTransfer,
			BlockWiseTransferSzx:     &BlockWiseTransferSzx,
			DisableTCPSignalMessages: c.DisableTCPSignalMessages,
			NotifyStartedFunc: func() {
				close(started)
			},
			NotifySessionEndFunc: func(s *ClientConn, err error) {
				if c.NotifySessionEndFunc != nil {
					c.NotifySessionEndFunc(err)


@@ 171,6 190,9 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
			newSessionTCPFunc: func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
				return clientConn.commander.networkSession, nil
			},
			newSessionDTLSFunc: func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
				return clientConn.commander.networkSession, nil
			},
			newSessionUDPFunc: func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.commander.networkSession.RemoteAddr().String() {
					if s, ok := clientConn.commander.networkSession.(*blockWiseSession); ok {


@@ 200,6 222,18 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
	case *net.TCPConn, *tls.Conn:
		session, err := newSessionTCP(coapNet.NewConn(clientConn.srv.Conn, clientConn.srv.heartBeat()), clientConn.srv)
		if err != nil {
			clientConn.srv.Conn.Close()
			return nil, err
		}
		if session.blockWiseEnabled() {
			clientConn.commander.networkSession = &blockWiseSession{networkSession: session}
		} else {
			clientConn.commander.networkSession = session
		}
	case *coapNet.ConnDTLS:
		session, err := newSessionDTLS(coapNet.NewConn(clientConn.srv.Conn, clientConn.srv.heartBeat()), clientConn.srv)
		if err != nil {
			clientConn.srv.Conn.Close()
			return nil, err
		}
		if session.blockWiseEnabled() {


@@ 212,6 246,7 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		coapNet.SetUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(coapNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat(), c.MulticastHopLimit), clientConn.srv, sessionUPDData)
		if err != nil {
			clientConn.srv.Conn.Close()
			return nil, err
		}
		if session.blockWiseEnabled() {


@@ 219,6 254,9 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		} else {
			clientConn.commander.networkSession = session
		}
	default:
		clientConn.srv.Conn.Close()
		return nil, fmt.Errorf("unknown connection type %T", clientConn.srv.Conn)
	}

	go func() {


@@ 229,6 267,13 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
	}()
	clientConn.client = c

	select {
	case <-started:
	case err := <-clientConn.shutdownSync:
		clientConn.srv.Conn.Close()
		return nil, err
	}

	return clientConn, nil
}



@@ 415,14 460,33 @@ func fixNetTLS(network string) string {
	return network
}

// DialWithTLS connects to the address on the named network with TLS.
func DialWithTLS(network, address string, tlsConfig *tls.Config) (conn *ClientConn, err error) {
func fixNetDTLS(network string) string {
	if !strings.HasSuffix(network, "-dtls") {
		network += "-dtls"
	}
	return network
}

// DialTLS connects to the address on the named network with TLS.
func DialTLS(network, address string, tlsConfig *tls.Config) (conn *ClientConn, err error) {
	client := Client{Net: fixNetTLS(network), TLSConfig: tlsConfig}
	return client.DialWithContext(context.Background(), address)
}

// DialTimeoutWithTLS acts like DialWithTLS but takes a timeout.
func DialTimeoutWithTLS(network, address string, tlsConfig *tls.Config, timeout time.Duration) (conn *ClientConn, err error) {
// DialDTLS connects to the address on the named network with DTLS.
func DialDTLS(network, address string, config *dtls.Config) (conn *ClientConn, err error) {
	client := Client{Net: fixNetDTLS(network), DTLSConfig: config}
	return client.DialWithContext(context.Background(), address)
}

// DialTLSWithTimeout acts like DialTLS but takes a timeout.
func DialTLSWithTimeout(network, address string, tlsConfig *tls.Config, timeout time.Duration) (conn *ClientConn, err error) {
	client := Client{Net: fixNetTLS(network), DialTimeout: timeout, TLSConfig: tlsConfig}
	return client.DialWithContext(context.Background(), address)
}

// DialDTLSWithTimeout acts like DialwriteDeadlineDTLS but takes a timeout.
func DialDTLSWithTimeout(network, address string, config *dtls.Config, timeout time.Duration) (conn *ClientConn, err error) {
	client := Client{Net: fixNetDTLS(network), DialTimeout: timeout, DTLSConfig: config}
	return client.DialWithContext(context.Background(), address)
}

A dtls_test.go => dtls_test.go +35 -0
@@ 0,0 1,35 @@
package coap

import "testing"

func TestServingDTLS(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", false, BlockWiseSzx16, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx16(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx16, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx32(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx32, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx64(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx64, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx128(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx128, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx256(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx256, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx512(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx512, make([]byte, 128), simpleMsg)
}

func TestServingDTLSBlockWiseSzx1024(t *testing.T) {
	testServingTCPWithMsg(t, "udp-dtls", true, BlockWiseSzx1024, make([]byte, 128), simpleMsg)
}

A examples/dtls/client/main.go => examples/dtls/client/main.go +35 -0
@@ 0,0 1,35 @@
package main

import (
	"fmt"
	"log"
	"os"

	coap "github.com/go-ocf/go-coap"
	"github.com/pion/dtls"
)

func main() {
	co, err := coap.DialDTLS("udp", "localhost:5688", &dtls.Config{
		PSK: func(hint []byte) ([]byte, error) {
			fmt.Printf("Server's hint: %s \n", hint)
			return []byte{0xAB, 0xC1, 0x23}, nil
		},
		PSKIdentityHint: []byte("Pion DTLS Server"),
		CipherSuites:    []dtls.CipherSuiteID{dtls.TLS_PSK_WITH_AES_128_CCM_8},
	})
	if err != nil {
		log.Fatalf("Error dialing: %v", err)
	}
	path := "/a"
	if len(os.Args) > 1 {
		path = os.Args[1]
	}
	resp, err := co.Get(path)

	if err != nil {
		log.Fatalf("Error sending request: %v", err)
	}

	log.Printf("Response payload: %v", resp.Payload())
}

A examples/dtls/server/main.go => examples/dtls/server/main.go +44 -0
@@ 0,0 1,44 @@
package main

import (
	"fmt"
	"log"

	coap "github.com/go-ocf/go-coap"
	"github.com/pion/dtls"
)

func handleA(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
	w.SetContentFormat(coap.TextPlain)
	log.Printf("Transmitting from A")
	if _, err := w.Write([]byte("hello world")); err != nil {
		log.Printf("Cannot send response: %v", err)
	}
}

func handleB(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
	resp := w.NewResponse(coap.Content)
	resp.SetOption(coap.ContentFormat, coap.TextPlain)
	resp.SetPayload([]byte("good bye!"))
	log.Printf("Transmitting from B %#v", resp)
	if err := w.WriteMsg(resp); err != nil {
		log.Printf("Cannot send response: %v", err)
	}
}

func main() {
	mux := coap.NewServeMux()
	mux.Handle("/a", coap.HandlerFunc(handleA))
	mux.Handle("/b", coap.HandlerFunc(handleB))

	log.Fatal(coap.ListenAndServeDTLS("udp", ":5688", &dtls.Config{
		PSK: func(hint []byte) ([]byte, error) {
			fmt.Printf("Client's hint: %s \n", hint)
			return []byte{0xAB, 0xC1, 0x23}, nil
		},
		PSKIdentityHint: []byte("Pion DTLS Client"),
		CipherSuites:    []dtls.CipherSuiteID{dtls.TLS_PSK_WITH_AES_128_CCM_8},
	}, mux))
}

M examples/mcast/server/main.go => examples/mcast/server/main.go +1 -1
@@ 61,5 61,5 @@ func main() {
	mux := coap.NewServeMux()
	mux.Handle("/oic/res", coap.HandlerFunc(handleMcast))

	log.Fatal(coap.ListenAndServe("224.0.1.187:5688", "udp-mcast", mux))
	log.Fatal(coap.ListenAndServe("udp-mcast", "224.0.1.187:5688", mux))
}

M examples/observe/server/main.go => examples/observe/server/main.go +1 -1
@@ 28,7 28,7 @@ func periodicTransmitter(w coap.ResponseWriter, req *coap.Request) {
}

func main() {
	log.Fatal(coap.ListenAndServe(":5688", "udp",
	log.Fatal(coap.ListenAndServe("udp", ":5688",
		coap.HandlerFunc(func(w coap.ResponseWriter, req *coap.Request) {
			log.Printf("Got message path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.Client.RemoteAddr())
			switch {

M examples/ping/server/main.go => examples/ping/server/main.go +1 -1
@@ 32,5 32,5 @@ func main() {
		log.Fatalf("Run %v LISTEN_ADDRESS:PORT ", os.Args[0])
	}

	log.Fatal(coap.ListenAndServe(os.Args[1], "tcp", coap.HandlerFunc(handleA)))
	log.Fatal(coap.ListenAndServe("tcp", os.Args[1], coap.HandlerFunc(handleA)))
}

M examples/simple/server/main.go => examples/simple/server/main.go +1 -1
@@ 31,5 31,5 @@ func main() {
	mux.Handle("/a", coap.HandlerFunc(handleA))
	mux.Handle("/b", coap.HandlerFunc(handleB))

	log.Fatal(coap.ListenAndServe(":5688", "udp", mux))
	log.Fatal(coap.ListenAndServe("udp", ":5688", mux))
}

M message.go => message.go +2 -7
@@ 519,9 519,8 @@ type MessageParams struct {

// MessageBase is a CoAP message.
type MessageBase struct {
	typ       COAPType
	code      COAPCode
	messageID uint16
	typ  COAPType
	code COAPCode

	token, payload []byte



@@ 540,10 539,6 @@ func (m *MessageBase) Code() COAPCode {
	return m.code
}

func (m *MessageBase) MessageID() uint16 {
	return m.messageID
}

func (m *MessageBase) Token() []byte {
	return m.token
}

M message_test.go => message_test.go +70 -69
@@ 96,8 96,9 @@ func TestMessageConfirmable(t *testing.T) {
		m   Message
		exp bool
	}{
		{&DgramMessage{MessageBase{typ: Confirmable}}, true},
		{&DgramMessage{MessageBase{typ: NonConfirmable}}, false},
		{&DgramMessage{
			MessageBase{typ: Confirmable}, 0}, true},
		{&DgramMessage{MessageBase{typ: NonConfirmable}, 0}, false},
	}

	for _, test := range tests {


@@ 159,11 160,11 @@ func TestCodeString(t *testing.T) {

func TestEncodeMessageWithoutOptionsAndPayload(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}

	buf := &bytes.Buffer{}


@@ 181,11 182,11 @@ func TestEncodeMessageWithoutOptionsAndPayload(t *testing.T) {

func TestEncodeMessageSmall(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}

	req.AddOption(ETag, []byte("weetag"))


@@ 209,12 210,12 @@ func TestEncodeMessageSmall(t *testing.T) {

func TestEncodeMessageSmallWithPayload(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
			payload:   []byte("hi"),
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    GET,
			payload: []byte("hi"),
		},
		messageID: 12345,
	}

	req.AddOption(ETag, []byte("weetag"))


@@ 260,12 261,12 @@ func TestInvalidMessageParsing(t *testing.T) {

func TestOptionsWithIllegalLengthAreIgnoredDuringParsing(t *testing.T) {
	exp := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 0xabcd,
			payload:   []byte{},
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    GET,
			payload: []byte{},
		},
		messageID: 0xabcd,
	}
	msg, err := ParseDgramMessage([]byte{0x40, 0x01, 0xab, 0xcd,
		0x73, // URI-Port option (uint) with length 3 (valid lengths are 0-2)


@@ 341,11 342,11 @@ func TestDecodeMessageSmallWithPayload(t *testing.T) {

func TestEncodeMessageVerySmall(t *testing.T) {
	req := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	req.SetPathString("x")



@@ 367,11 368,11 @@ func TestEncodeMessageVerySmall(t *testing.T) {
// Same as above, but with a leading slash
func TestEncodeMessageVerySmall2(t *testing.T) {
	req := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	req.SetPathString("/x")



@@ 399,11 400,11 @@ func TestEncodeManyQueries(t *testing.T) {
	}
	for p, a := range tests {
		m := &DgramMessage{
			MessageBase{
				typ:       Confirmable,
				code:      GET,
				messageID: 12345,
			MessageBase: MessageBase{
				typ:  Confirmable,
				code: GET,
			},
			messageID: 12345,
		}
		m.SetQueryString(p)
		buf := &bytes.Buffer{}


@@ 434,11 435,11 @@ func TestEncodeSeveral(t *testing.T) {
	}
	for p, a := range tests {
		m := &DgramMessage{
			MessageBase{
				typ:       Confirmable,
				code:      GET,
				messageID: 12345,
			MessageBase: MessageBase{
				typ:  Confirmable,
				code: GET,
			},
			messageID: 12345,
		}
		m.SetPathString(p)
		buf := &bytes.Buffer{}


@@ 462,11 463,11 @@ func TestEncodeSeveral(t *testing.T) {

func TestPathAsOption(t *testing.T) {
	m := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	m.SetOption(LocationPath, []string{"a", "b"})
	buf := &bytes.Buffer{}


@@ 482,11 483,11 @@ func TestPathAsOption(t *testing.T) {

func TestEncodePath14(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	req.SetPathString("123456789ABCDE")



@@ 509,11 510,11 @@ func TestEncodePath14(t *testing.T) {

func TestEncodePath15(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	req.SetPathString("123456789ABCDEF")



@@ 536,11 537,11 @@ func TestEncodePath15(t *testing.T) {

func TestEncodeLargePath(t *testing.T) {
	req := DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
		MessageBase: MessageBase{
			typ:  Confirmable,
			code: GET,
		},
		messageID: 12345,
	}
	req.SetPathString("this_path_is_longer_than_fifteen_bytes")



@@ 586,12 587,12 @@ func TestDecodeLargePath(t *testing.T) {
	path := "this_path_is_longer_than_fifteen_bytes"

	exp := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
			payload:   []byte{},
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    GET,
			payload: []byte{},
		},
		messageID: 12345,
	}

	exp.SetOption(URIPath, path)


@@ 615,12 616,12 @@ func TestDecodeMessageSmaller(t *testing.T) {
	}

	exp := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
			payload:   []byte{},
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    GET,
			payload: []byte{},
		},
		messageID: 12345,
	}

	exp.SetOption(ETag, []byte("weetag"))


@@ 824,13 825,13 @@ func TestDecodeContentFormatOptionToMediaType(t *testing.T) {

func TestEncodeMessageWithAllOptions(t *testing.T) {
	req := &DgramMessage{
		MessageBase{
			typ:       Confirmable,
			code:      GET,
			messageID: 12345,
			token:     []byte("TOKEN"),
			payload:   []byte("PAYLOAD"),
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    GET,
			token:   []byte("TOKEN"),
			payload: []byte("PAYLOAD"),
		},
		messageID: 12345,
	}

	req.AddOption(IfMatch, []byte("IFMATCH"))


@@ 1155,7 1156,7 @@ func TestToBytesLength(t *testing.T) {
}

func TestSetCode(t *testing.T) {
	req := DgramMessage{MessageBase{}}
	req := DgramMessage{MessageBase{}, 0}

	req.SetType(Confirmable)
	req.SetCode(GET)

M messagedgram.go => messagedgram.go +12 -7
@@ 10,20 10,25 @@ import (
// DgramMessage implements Message interface.
type DgramMessage struct {
	MessageBase
	messageID uint16
}

func NewDgramMessage(p MessageParams) *DgramMessage {
	return &DgramMessage{
		MessageBase{
			typ:       p.Type,
			code:      p.Code,
			messageID: p.MessageID,
			token:     p.Token,
			payload:   p.Payload,
		MessageBase: MessageBase{
			typ:     p.Type,
			code:    p.Code,
			token:   p.Token,
			payload: p.Payload,
		},
		messageID: p.MessageID,
	}
}

func (m *DgramMessage) MessageID() uint16 {
	return m.messageID
}

// SetMessageID
func (m *DgramMessage) SetMessageID(messageID uint16) {
	m.messageID = messageID


@@ 88,7 93,7 @@ func (m *DgramMessage) UnmarshalBinary(data []byte) error {
	}

	m.MessageBase.code = COAPCode(data[1])
	m.MessageBase.messageID = binary.BigEndian.Uint16(data[2:4])
	m.messageID = binary.BigEndian.Uint16(data[2:4])

	if tokenLen > 0 {
		m.MessageBase.token = make([]byte, tokenLen)

M messagetcp.go => messagetcp.go +4 -0
@@ 110,6 110,10 @@ func NewTcpMessage(p MessageParams) *TcpMessage {
	}
}

func (m *TcpMessage) MessageID() uint16 {
	return 0
}

// SetMessageID
func (m *TcpMessage) SetMessageID(messageID uint16) {
	//not used by COAP over TCP

A net/connDTLS.go => net/connDTLS.go +143 -0
@@ 0,0 1,143 @@
package net

import (
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

type connDTLSData struct {
	data []byte
	err  error
}

type ConnDTLS struct {
	conn       net.Conn
	readDataCh chan connDTLSData
	doneCh     chan struct{}
	wg         sync.WaitGroup

	readDeadline atomic.Value
}

func (c *ConnDTLS) readLoop() {
	defer c.wg.Done()
	buf := make([]byte, 8192)
	for {
		n, err := c.conn.Read(buf)
		d := connDTLSData{err: err}
		if err == nil && n > 0 {
			d.data = append(d.data, buf[:n]...)
		}
		select {
		case c.readDataCh <- d:
			if err != nil {
				return
			}
		case <-c.doneCh:
			return
		}
	}
}

func NewConnDTLS(conn net.Conn) *ConnDTLS {
	c := ConnDTLS{
		conn:       conn,
		readDataCh: make(chan connDTLSData),
		doneCh:     make(chan struct{}),
	}
	c.wg.Add(1)
	go c.readLoop()
	return &c
}

type errS struct {
	error
	timeout   bool
	temporary bool
}

func (e errS) Timeout() bool {
	return e.timeout
}

func (e errS) Temporary() bool {
	return e.temporary
}

func (c *ConnDTLS) processData(b []byte, d connDTLSData) (n int, err error) {
	if d.err != nil {
		return 0, errS{
			error: d.err,
		}
	}
	if len(b) < len(d.data) {
		return 0, errS{
			error: fmt.Errorf("buffer is too small"),
		}
	}
	return copy(b, d.data), nil
}

func (c *ConnDTLS) Read(b []byte) (n int, err error) {
	var deadline time.Time
	v := c.readDeadline.Load()
	if v != nil {
		deadline = v.(time.Time)
	}
	if deadline.IsZero() {
		select {
		case d := <-c.readDataCh:
			return c.processData(b, d)
		}
	}

	select {
	case d := <-c.readDataCh:
		return c.processData(b, d)
	case <-time.After(time.Now().Sub(deadline)):
		return 0, errS{
			error:     fmt.Errorf(ioTimeout),
			temporary: true,
			timeout:   true,
		}
	}
}

func (c *ConnDTLS) Write(b []byte) (n int, err error) {
	return c.conn.Write(b)
}

func (c *ConnDTLS) Close() error {
	err := c.conn.Close()
	close(c.doneCh)
	c.wg.Wait()
	return err
}

func (c *ConnDTLS) LocalAddr() net.Addr {
	return c.conn.LocalAddr()
}

func (c *ConnDTLS) RemoteAddr() net.Addr {
	return c.conn.RemoteAddr()
}

func (c *ConnDTLS) SetDeadline(t time.Time) error {
	err := c.SetReadDeadline(t)
	if err != nil {
		return err
	}
	return c.SetWriteDeadline(t)
}

func (c *ConnDTLS) SetReadDeadline(t time.Time) error {
	c.readDeadline.Store(t)
	return nil
}

func (c *ConnDTLS) SetWriteDeadline(t time.Time) error {
	return c.conn.SetWriteDeadline(t)
}

M net/connUDP.go => net/connUDP.go +1 -1
@@ 205,7 205,7 @@ func (c *ConnUDP) WriteWithContext(ctx context.Context, udpCtx *ConnUDPContext, 
			if isTemporary(err) {
				continue
			}
			return fmt.Errorf("cannot write to udp connection")
			return fmt.Errorf("cannot write to udp connection: %v", err)
		}
		written += n
	}

A net/dtlslistener.go => net/dtlslistener.go +135 -0
@@ 0,0 1,135 @@
package net

import (
	"context"
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"github.com/pion/dtls"
)

type connData struct {
	conn net.Conn
	err  error
}

// DTLSListener is a DTLS listener that provides accept with context.
type DTLSListener struct {
	listener  *dtls.Listener
	heartBeat time.Duration
	wg        sync.WaitGroup
	doneCh    chan struct{}
	connCh    chan connData

	deadline atomic.Value
}

func (l *DTLSListener) acceptLoop() {
	defer l.wg.Done()
	for {
		conn, err := l.listener.Accept()
		select {
		case l.connCh <- connData{conn: conn, err: err}:
			if err != nil {
				return
			}
		case <-l.doneCh:
			return
		}
	}
}

// NewDTLSListener creates dtls listener.
// Known networks are "udp", "udp4" (IPv4-only), "udp6" (IPv6-only).
func NewDTLSListener(network string, addr string, cfg *dtls.Config, heartBeat time.Duration) (*DTLSListener, error) {
	a, err := net.ResolveUDPAddr(network, addr)
	if err != nil {
		return nil, fmt.Errorf("cannot resolve address: %v", err)
	}
	listener, err := dtls.Listen(network, a, cfg)
	if err != nil {
		return nil, fmt.Errorf("cannot create new dtls listener: %v", err)
	}
	l := DTLSListener{
		listener:  listener,
		heartBeat: heartBeat,
		doneCh:    make(chan struct{}),
		connCh:    make(chan connData),
	}
	l.wg.Add(1)

	go l.acceptLoop()

	return &l, nil
}

// AcceptWithContext waits with context for a generic Conn.
func (l *DTLSListener) AcceptWithContext(ctx context.Context) (net.Conn, error) {
	for {
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return nil, fmt.Errorf("cannot accept connections: %v", ctx.Err())
			}
			return nil, nil
		default:
		}
		err := l.SetDeadline(time.Now().Add(l.heartBeat))
		if err != nil {
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		rw, err := l.Accept()
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		return rw, nil
	}
}

// SetDeadline sets deadline for accept operation.
func (l *DTLSListener) SetDeadline(t time.Time) error {
	l.deadline.Store(t)
	return nil
}

// Accept waits for a generic Conn.
func (l *DTLSListener) Accept() (net.Conn, error) {
	var deadline time.Time
	v := l.deadline.Load()
	if v != nil {
		deadline = v.(time.Time)
	}

	if deadline.IsZero() {
		select {
		case d := <-l.connCh:
			return NewConnDTLS(d.conn), d.err
		}
	}

	select {
	case d := <-l.connCh:
		return NewConnDTLS(d.conn), d.err
	case <-time.After(time.Now().Sub(deadline)):
		return nil, fmt.Errorf(ioTimeout)
	}
}

// Close closes the connection.
func (l *DTLSListener) Close() error {
	err := l.listener.Close()
	close(l.doneCh)
	l.wg.Wait()
	return err
}

// Addr represents a network end point address.
func (l *DTLSListener) Addr() net.Addr {
	return l.listener.Addr()
}

M net/isTemporary.go => net/isTemporary.go +5 -2
@@ 5,12 5,15 @@ import (
	"strings"
)

// https://github.com/golang/go/blob/958e212db799e609b2a8df51cdd85c9341e7a404/src/internal/poll/fd.go#L43
const ioTimeout = "i/o timeout"

func isTemporary(err error) bool {
	if netErr, ok := err.(net.Error); ok && (netErr.Temporary() || netErr.Timeout()) {
		return true
	}
	// https://github.com/golang/go/blob/958e212db799e609b2a8df51cdd85c9341e7a404/src/internal/poll/fd.go#L43
	if strings.Contains(err.Error(), "i/o timeout") {

	if strings.Contains(err.Error(), ioTimeout) {
		return true
	}
	return false

M networksession.go => networksession.go +0 -572
@@ 1,16 1,8 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"net"
	"sync"
	"sync/atomic"
	"time"

	coapNet "github.com/go-ocf/go-coap/net"
)

// A networkSession interface is used by an COAP handler to


@@ 60,570 52,6 @@ type networkSession interface {
	blockWiseIsValid(szx BlockWiseSzx) bool
}

type connUDP interface {
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
	ReadWithContext(ctx context.Context, buffer []byte) (int, *coapNet.ConnUDPContext, error)
	WriteWithContext(ctx context.Context, udpCtx *coapNet.ConnUDPContext, buffer []byte) error
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}

	if BlockWiseTransfer && BlockWiseTransferSzx == BlockWiseSzxBERT {
		return nil, ErrInvalidBlockWiseSzx
	}

	s := &sessionUDP{
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
		},
		connection:     connection,
		sessionUDPData: sessionUDPData,
		mapPairs:       make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
	}
	return s, nil
}

// newSessionTCP create new session for TCP connection
func newSessionTCP(connection *coapNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockWiseSzxBERT
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}
	s := &sessionTCP{
		mapPairs:           make(map[[MaxTokenSize]byte](*sessionResp)),
		peerMaxMessageSize: uint32(srv.MaxMessageSize),
		connection:         connection,
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
		},
	}

	if !s.srv.DisableTCPSignalMessages {
		if err := s.sendCSM(); err != nil {
			return nil, err
		}
	}

	return s, nil
}

type sessionResp struct {
	ch chan *Request // channel must have size 1 for non-blocking write to channel
}

type sessionBase struct {
	srv      *Server
	handler  *TokenHandler
	sequence uint64

	blockWiseTransfer    bool
	blockWiseTransferSzx uint32 //BlockWiseSzx
}

type sessionUDP struct {
	sessionBase
	connection     connUDP
	sessionUDPData *coapNet.ConnUDPContext                        // oob data to get egress interface right
	mapPairs       map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock   sync.Mutex                                     //to sync add remove token
}

type sessionTCP struct {
	sessionBase
	connection   *coapNet.Conn
	mapPairs     map[[MaxTokenSize]byte]*sessionResp //storage of channel Message
	mapPairsLock sync.Mutex                          //to sync add remove token

	peerBlockWiseTransfer uint32
	peerMaxMessageSize    uint32
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionUDP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionTCP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionUDP) RemoteAddr() net.Addr {
	return s.sessionUDPData.RemoteAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionTCP) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

// BlockWiseTransferEnabled
func (s *sessionUDP) blockWiseEnabled() bool {
	return s.blockWiseTransfer
}

func (s *sessionTCP) blockWiseEnabled() bool {
	return s.blockWiseTransfer /*&& atomic.LoadUint32(&s.peerBlockWiseTransfer) != 0*/
}

func (s *sessionBase) blockWiseSzx() BlockWiseSzx {
	return BlockWiseSzx(atomic.LoadUint32(&s.blockWiseTransferSzx))
}

func (s *sessionBase) setBlockWiseSzx(szx BlockWiseSzx) {
	atomic.StoreUint32(&s.blockWiseTransferSzx, uint32(szx))
}

func (s *sessionBase) Sequence() uint64 {
	return atomic.AddUint64(&s.sequence, 1)
}

func (s *sessionBase) blockWiseMaxPayloadSize(peer BlockWiseSzx) (int, BlockWiseSzx) {
	szx := s.blockWiseSzx()
	if peer < szx {
		return szxToBytes[peer], peer
	}
	return szxToBytes[szx], szx
}

func (s *sessionTCP) blockWiseMaxPayloadSize(peer BlockWiseSzx) (int, BlockWiseSzx) {
	szx := s.blockWiseSzx()
	if szx == BlockWiseSzxBERT && peer == BlockWiseSzxBERT {
		m := atomic.LoadUint32(&s.peerMaxMessageSize)
		if m == 0 {
			m = uint32(s.srv.MaxMessageSize)
		}
		return int(m - (m % 1024)), BlockWiseSzxBERT
	}
	return s.sessionBase.blockWiseMaxPayloadSize(peer)
}

func (s *sessionUDP) blockWiseIsValid(szx BlockWiseSzx) bool {
	return szx <= BlockWiseSzx1024
}

func (s *sessionTCP) blockWiseIsValid(szx BlockWiseSzx) bool {
	return true
}

func (s *sessionBase) TokenHandler() *TokenHandler {
	return s.handler
}

func (s *sessionUDP) closeWithError(err error) error {
	s.srv.sessionUDPMapLock.Lock()
	delete(s.srv.sessionUDPMap, s.sessionUDPData.Key())
	s.srv.sessionUDPMapLock.Unlock()
	c := ClientConn{commander: &ClientCommander{s}}
	s.srv.NotifySessionEndFunc(&c, err)

	return err
}

// Ping send ping over udp(unicast) and wait for response.
func (s *sessionUDP) PingWithContext(ctx context.Context) error {
	//provoking to get a reset message - "CoAP ping" in RFC-7252
	//https://tools.ietf.org/html/rfc7252#section-4.2
	//https://tools.ietf.org/html/rfc7252#section-4.3
	//https://tools.ietf.org/html/rfc7252#section-1.2 "Reset Message"
	// BUG of iotivity: https://jira.iotivity.org/browse/IOT-3149
	req := s.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      Empty,
		MessageID: GenerateMessageID(),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Type() == Reset {
		return nil
	}
	return ErrInvalidResponse
}

func (s *sessionTCP) PingWithContext(ctx context.Context) error {
	if s.srv.DisableTCPSignalMessages {
		return fmt.Errorf("cannot send ping: TCP Signal messages are disabled")
	}
	token, err := GenerateToken()
	if err != nil {
		return err
	}
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  Ping,
		Token: []byte(token),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Code() == Pong {
		return nil
	}
	return ErrInvalidResponse
}

// Close implements the networkSession.Close method
func (s *sessionUDP) Close() error {
	return s.closeWithError(nil)
}

func (s *sessionTCP) closeWithError(err error) error {
	if s.connection != nil {
		c := ClientConn{commander: &ClientCommander{s}}
		s.srv.NotifySessionEndFunc(&c, err)
		e := s.connection.Close()
		//s.connection = nil
		if e == nil {
			e = err
		}
		return e
	}
	return err
}

// Close implements the networkSession.Close method
func (s *sessionTCP) Close() error {
	return s.closeWithError(nil)
}

// NewMessage Create message for response
func (s *sessionUDP) NewMessage(p MessageParams) Message {
	return NewDgramMessage(p)
}

// NewMessage Create message for response
func (s *sessionTCP) NewMessage(p MessageParams) Message {
	return NewTcpMessage(p)
}

// Close implements the networkSession.Close method
func (s *sessionUDP) IsTCP() bool {
	return false
}

// Close implements the networkSession.Close method
func (s *sessionTCP) IsTCP() bool {
	return true
}

func (s *sessionBase) exchangeFunc(req Message, writeTimeout, readTimeout time.Duration, pairChan *sessionResp, write func(msg Message, timeout time.Duration) error) (Message, error) {

	err := write(req, writeTimeout)
	if err != nil {
		return nil, err
	}

	select {
	case resp := <-pairChan.ch:
		return resp.Msg, nil
	case <-time.After(readTimeout):
		return nil, ErrTimeout
	}
}

func (s *sessionTCP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	if err := validateMsg(req); err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	if req.Token() == nil {
		return nil, ErrTokenNotExist
	}
	pairChan := &sessionResp{make(chan *Request, 1)}

	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], req.Token())
	s.mapPairsLock.Lock()
	if s.mapPairs[pairToken] != nil {
		return nil, ErrTokenAlreadyExist
	}

	s.mapPairs[pairToken] = pairChan
	s.mapPairsLock.Unlock()

	defer func() {
		if req.Token() != nil {
			s.mapPairsLock.Lock()
			delete(s.mapPairs, pairToken)
			s.mapPairsLock.Unlock()
		}
	}()

	err := s.WriteMsgWithContext(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	select {
	case request := <-pairChan.ch:
		return request.Msg, nil
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange: %v", ctx.Err())
		}
		return nil, fmt.Errorf("cannot exchange: cancelled")
	}
}

func (s *sessionUDP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	if err := validateMsg(req); err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	//register msgid to token
	pairChan := &sessionResp{make(chan *Request, 1)}
	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], req.Token())
	s.mapPairsLock.Lock()
	if s.mapPairs[pairToken] == nil {
		s.mapPairs[pairToken] = make(map[uint16]*sessionResp)
	}
	if s.mapPairs[pairToken][req.MessageID()] != nil {
		s.mapPairsLock.Unlock()
		return nil, ErrTokenAlreadyExist
	}
	s.mapPairs[pairToken][req.MessageID()] = pairChan
	s.mapPairsLock.Unlock()

	defer func() {
		s.mapPairsLock.Lock()
		delete(s.mapPairs[pairToken], req.MessageID())
		if len(s.mapPairs[pairToken]) == 0 {
			delete(s.mapPairs, pairToken)
		}
		s.mapPairsLock.Unlock()
	}()

	err := s.WriteMsgWithContext(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	select {
	case request := <-pairChan.ch:
		return request.Msg, nil
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange: %v", ctx.Err())
		}
		return nil, fmt.Errorf("cannot exchange: cancelled")
	}
}

func (s *sessionTCP) validateMessageSize(msg Message) error {
	size, err := msg.ToBytesLength()
	if err != nil {
		return err
	}

	max := atomic.LoadUint32(&s.peerMaxMessageSize)
	if max != 0 && uint32(size) > max {
		return ErrMaxMessageSizeLimitExceeded
	}

	return nil
}

// Write implements the networkSession.Write method.
func (s *sessionTCP) WriteMsgWithContext(ctx context.Context, req Message) error {
	if err := s.validateMessageSize(req); err != nil {
		return err
	}
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to tcp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, buffer.Bytes())
}

func (s *sessionUDP) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to udp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, s.sessionUDPData, buffer.Bytes())
}

func validateMsg(msg Message) error {
	if msg.Payload() != nil && msg.Option(ContentFormat) == nil {
		return ErrContentFormatNotSet
	}
	if msg.Payload() == nil && msg.Option(ContentFormat) != nil {
		return ErrInvalidPayload
	}
	//TODO check size of m
	return nil
}

func (s *sessionTCP) handlePairMsg(w ResponseWriter, r *Request) bool {
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	s.mapPairsLock.Lock()
	pair := s.mapPairs[token]
	s.mapPairsLock.Unlock()
	if pair != nil {
		select {
		case pair.ch <- r:
		default:
			log.Fatal("Exactly one message can be send to pair. This is second message.")
		}

		return true
	}
	return false
}

func (s *sessionUDP) handlePairMsg(w ResponseWriter, r *Request) bool {
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	//validate token

	s.mapPairsLock.Lock()
	pair := s.mapPairs[token][r.Msg.MessageID()]
	s.mapPairsLock.Unlock()
	if pair != nil {
		select {
		case pair.ch <- r:
		default:
			log.Fatal("Exactly one message can be send to pair. This is second message.")
		}
		return true
	}
	return false
}

func (s *sessionTCP) sendCSM() error {
	token, err := GenerateToken()
	if err != nil {
		return err
	}
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  CSM,
		Token: []byte(token),
	})
	if s.srv.MaxMessageSize != 0 {
		req.AddOption(MaxMessageSize, uint32(s.srv.MaxMessageSize))
	}
	if s.blockWiseEnabled() {
		req.AddOption(BlockWiseTransfer, []byte{})
	}
	return s.WriteMsgWithContext(context.Background(), req)
}

func (s *sessionTCP) setPeerMaxMessageSize(val uint32) {
	atomic.StoreUint32(&s.peerMaxMessageSize, val)
}

func (s *sessionTCP) setPeerBlockWiseTransfer(val bool) {
	v := uint32(0)
	if val {
		v = 1
	}
	atomic.StoreUint32(&s.peerBlockWiseTransfer, v)
}

func (s *sessionUDP) sendPong(w ResponseWriter, r *Request) error {
	resp := r.Client.NewMessage(MessageParams{
		Type:      Reset,
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return w.WriteMsgWithContext(r.Ctx, resp)
}

func (s *sessionTCP) sendPong(w ResponseWriter, r *Request) error {
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  Pong,
		Token: r.Msg.Token(),
	})
	return w.WriteMsgWithContext(r.Ctx, req)
}

func (s *sessionTCP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	case CSM:
		maxmsgsize := uint32(maxMessageSize)
		if size, ok := r.Msg.Option(MaxMessageSize).(uint32); ok {
			s.setPeerMaxMessageSize(size)
			maxmsgsize = size
		}
		if r.Msg.Option(BlockWiseTransfer) != nil {
			s.setPeerBlockWiseTransfer(true)
			startIter := s.blockWiseSzx()
			if startIter == BlockWiseSzxBERT {
				if szxToBytes[BlockWiseSzx1024] < int(maxmsgsize) {
					s.setBlockWiseSzx(BlockWiseSzxBERT)
					return true
				}
				startIter = BlockWiseSzx512
			}
			for i := startIter; i > BlockWiseSzx16; i-- {
				if szxToBytes[i] < int(maxmsgsize) {
					s.setBlockWiseSzx(i)
					return true
				}
			}
			s.setBlockWiseSzx(BlockWiseSzx16)
		}

		return true
	case Ping:
		if r.Msg.Option(Custody) != nil {
			//TODO
		}
		s.sendPong(w, r)
		return true
	case Release:
		if _, ok := r.Msg.Option(AlternativeAddress).(string); ok {
			//TODO
		}
		return true
	case Abort:
		if _, ok := r.Msg.Option(BadCSMOption).(uint32); ok {
			//TODO
		}
		return true
	}
	return false
}

func (s *sessionUDP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	// handle of udp ping
	case Empty:
		if r.Msg.Type() == Confirmable && r.Msg.AllOptions().Len() == 0 && (r.Msg.Payload() == nil || len(r.Msg.Payload()) == 0) {
			s.sendPong(w, r)
			return true
		}
	}
	return false
}

func handleSignalMsg(w ResponseWriter, r *Request, next HandlerFunc) {
	if !r.Client.networkSession().handleSignals(w, r) {
		next(w, r)

M noresponsewriter_test.go => noresponsewriter_test.go +5 -5
@@ 93,11 93,11 @@ func TestNoResponseBehaviour(t *testing.T) {

	// send client request
	req := &DgramMessage{
		MessageBase{
			typ:       NonConfirmable,
			code:      GET,
			messageID: 1234,
		}}
		MessageBase: MessageBase{
			typ:  NonConfirmable,
			code: GET,
		},
		messageID: 1234}

	// supressing 2XX code: example Content; No error when server sends 4XX response
	req.SetOption(NoResponse, 2)

M server.go => server.go +133 -19
@@ 14,6 14,7 @@ import (
	"time"

	coapNet "github.com/go-ocf/go-coap/net"
	"github.com/pion/dtls"
)

// Interval for stop worker if no load


@@ 80,28 81,32 @@ func failedHandler() Handler { return HandlerFunc(HandleFailed) }

// ListenAndServe Starts a server on address and network specified Invoke handler
// for incoming queries.
func ListenAndServe(addr string, network string, handler Handler) error {
func ListenAndServe(network string, addr string, handler Handler) error {
	server := &Server{Addr: addr, Net: network, Handler: handler}
	return server.ListenAndServe()
}

// ListenAndServeTLS acts like http.ListenAndServeTLS, more information in
// http://golang.org/pkg/net/http/#ListenAndServeTLS
func ListenAndServeTLS(addr, certFile, keyFile string, handler Handler) error {
	cert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
		return err
func ListenAndServeTLS(network, addr string, config *tls.Config, handler Handler) error {
	server := &Server{
		Addr:      addr,
		Net:       fixNetTLS(network),
		TLSConfig: config,
		Handler:   handler,
	}

	config := tls.Config{
		Certificates: []tls.Certificate{cert},
	}
	return server.ListenAndServe()
}

// ListenAndServeDTLS acts like ListenAndServeTLS, more information in
// http://golang.org/pkg/net/http/#ListenAndServeTLS
func ListenAndServeDTLS(network string, addr string, config *dtls.Config, handler Handler) error {
	server := &Server{
		Addr:      addr,
		Net:       "tcp-tls",
		TLSConfig: &config,
		Handler:   handler,
		Addr:       addr,
		Net:        fixNetDTLS(network),
		DTLSConfig: config,
		Handler:    handler,
	}

	return server.ListenAndServe()


@@ 126,6 131,8 @@ type Server struct {
	Listener Listener
	// TLS connection configuration
	TLSConfig *tls.Config
	// DTLSConfig connection configuration
	DTLSConfig *dtls.Config
	// UDP/TCP "Listener/Connection" to use, this is to aid in systemd's socket activation.
	Conn net.Conn
	// Handler to invoke, COAP.DefaultServeMux if nil.


@@ 145,6 152,8 @@ type Server struct {
	newSessionUDPFunc func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error)
	// If newSessionUDPFunc is set it is called when session TCP want to be created
	newSessionTCPFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If newSessionUDPFunc is set it is called when session DTLS want to be created
	newSessionDTLSFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w *ClientConn)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.


@@ 293,6 302,13 @@ func (srv *Server) ListenAndServe() error {
		if err := connUDP.SetMulticastLoopback(true); err != nil {
			return err
		}
	case "udp-dtls", "udp4-dtls", "udp6-dtls":
		network := strings.TrimSuffix(srv.Net, "-dtls")
		listener, err = coapNet.NewDTLSListener(network, addr, srv.DTLSConfig, srv.heartBeat())
		if err != nil {
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		defer listener.Close()
	default:
		return ErrInvalidNetParameter
	}


@@ 308,7 324,14 @@ func (srv *Server) initServeTCP(conn *coapNet.Conn) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}
	return srv.serveTCPconnection(newShutdownWithContext(srv.doneChan), conn)
	return srv.serveTCPConnection(newShutdownWithContext(srv.doneChan), conn)
}

func (srv *Server) initServeDTLS(conn *coapNet.Conn) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}
	return srv.serveDTLSConnection(newShutdownWithContext(srv.doneChan), conn)
}

// ActivateAndServe starts a coapserver with the PacketConn or Listener


@@ 316,9 339,25 @@ func (srv *Server) initServeTCP(conn *coapNet.Conn) error {
func (srv *Server) ActivateAndServe() error {
	if srv.Conn != nil {
		switch c := srv.Conn.(type) {
		case *net.TCPConn, *tls.Conn:
		case *net.TCPConn:
			if srv.Net == "" {
				srv.Net = "tcp"
			}
			return srv.activateAndServe(nil, coapNet.NewConn(c, srv.heartBeat()), nil)
		case *tls.Conn:
			if srv.Net == "" {
				srv.Net = "tcp-tls"
			}
			return srv.activateAndServe(nil, coapNet.NewConn(c, srv.heartBeat()), nil)
		case *coapNet.ConnDTLS:
			if srv.Net == "" {
				srv.Net = "udp-dtls"
			}
			return srv.activateAndServe(nil, coapNet.NewConn(c, srv.heartBeat()), nil)
		case *net.UDPConn:
			if srv.Net == "" {
				srv.Net = "udp"
			}
			return srv.activateAndServe(nil, nil, coapNet.NewConnUDP(c, srv.heartBeat(), 2))
		}
		return ErrInvalidServerConnParameter


@@ 358,6 397,19 @@ func (srv *Server) activateAndServe(listener Listener, conn *coapNet.Conn, connU
		}
	}

	if srv.newSessionDTLSFunc == nil {
		srv.newSessionDTLSFunc = func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
			session, err := newSessionDTLS(connection, srv)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{networkSession: session}, nil
			}
			return session, nil
		}
	}

	if srv.newSessionUDPFunc == nil {
		srv.newSessionUDPFunc = func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
			session, err := newSessionUDP(connection, srv, sessionUDPData)


@@ 381,8 433,14 @@ func (srv *Server) activateAndServe(listener Listener, conn *coapNet.Conn, connU

	switch {
	case listener != nil:
		return srv.serveTCP(listener)
		if _, ok := listener.(*coapNet.DTLSListener); ok {
			return srv.serveDTLSListener(listener)
		}
		return srv.serveTCPListener(listener)
	case conn != nil:
		if strings.HasSuffix(srv.Net, "-dtls") {
			return srv.initServeDTLS(conn)
		}
		return srv.initServeTCP(conn)
	case connUDP != nil:
		return srv.initServeUDP(connUDP)


@@ 428,7 486,63 @@ func (srv *Server) heartBeat() time.Duration {
	return time.Millisecond * 100
}

func (srv *Server) serveTCPconnection(ctx *shutdownContext, conn *coapNet.Conn) error {
func (srv *Server) serveDTLSConnection(ctx *shutdownContext, conn *coapNet.Conn) error {
	session, err := srv.newSessionDTLSFunc(conn, srv)
	if err != nil {
		return err
	}
	c := ClientConn{commander: &ClientCommander{session}}
	srv.NotifySessionNewFunc(&c)

	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		m := make([]byte, ^uint16(0))
		n, err := conn.ReadWithContext(ctx, m)
		if err != nil {
			err := fmt.Errorf("cannot serve UDP connection %v", err)
			srv.closeSessions(err)
			return err
		}
		msg, err := ParseDgramMessage(m[:n])
		if err != nil {
			continue
		}

		// We will block poller wait loop when
		// all pool workers are busy.
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx, Sequence: c.Sequence()})
	}
}

// serveListener starts a DTLS listener for the server.
func (srv *Server) serveDTLSListener(l Listener) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

	var wg sync.WaitGroup
	ctx := newShutdownWithContext(srv.doneChan)

	for {
		rw, err := l.AcceptWithContext(ctx)
		if err != nil {
			wg.Wait()
			return fmt.Errorf("cannot serve tcp: %v", err)
		}
		if rw != nil {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveDTLSConnection(ctx, coapNet.NewConn(rw, srv.heartBeat()))
			}()
		}
	}
}

func (srv *Server) serveTCPConnection(ctx *shutdownContext, conn *coapNet.Conn) error {
	session, err := srv.newSessionTCPFunc(conn, srv)
	if err != nil {
		return err


@@ 472,8 586,8 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, conn *coapNet.Conn) 
	}
}

// serveTCP starts a TCP listener for the server.
func (srv *Server) serveTCP(l Listener) error {
// serveListener starts a TCP listener for the server.
func (srv *Server) serveTCPListener(l Listener) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}


@@ 491,7 605,7 @@ func (srv *Server) serveTCP(l Listener) error {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveTCPconnection(ctx, coapNet.NewConn(rw, srv.heartBeat()))
				srv.serveTCPConnection(ctx, coapNet.NewConn(rw, srv.heartBeat()))
			}()
		}
	}

M server_test.go => server_test.go +65 -20
@@ 7,7 7,6 @@ import (
	"encoding/binary"
	"fmt"
	"io"
	"log"
	"net"
	"strings"
	"sync"


@@ 15,15 14,14 @@ import (
	"time"

	coapNet "github.com/go-ocf/go-coap/net"
	"github.com/pion/dtls"
)

func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
	if isTCP {
		resp := &TcpMessage{
			MessageBase{
				//typ:       Acknowledgement, not used by COAP over TCP
				code: code,
				//messageID: req.MessageID(), , not used by COAP over TCP
				code:    code,
				payload: req.Payload(),
				token:   req.Token(),
			},


@@ 33,13 31,14 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
		return resp
	}
	resp := &DgramMessage{
		MessageBase{
			typ:       Acknowledgement,
			code:      code,
			messageID: req.MessageID(),
			payload:   req.Payload(),
			token:     req.Token(),
		MessageBase: MessageBase{
			typ:  Acknowledgement,
			code: code,

			payload: req.Payload(),
			token:   req.Token(),
		},
		messageID: req.MessageID(),
	}
	resp.SetPath(req.Path())
	resp.SetOption(ContentFormat, req.Option(ContentFormat))


@@ 50,7 49,7 @@ func EchoServer(w ResponseWriter, r *Request) {
	if r.Msg.IsConfirmable() {
		err := w.WriteMsg(CreateRespMessageByReq(r.Client.networkSession().IsTCP(), Valid, r.Msg))
		if err != nil {
			log.Printf("Cannot write echo %v", err)
			fmt.Printf("Cannot write echo %v", err)
		}
	}
}


@@ 197,6 196,36 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 
	return server, l.Addr().String(), fin, nil
}

func RunLocalDTLSServer(laddr string, config *dtls.Config, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx) (*Server, string, chan error, error) {
	l, err := coapNet.NewDTLSListener("udp", laddr, config, time.Millisecond*100)
	if err != nil {
		return nil, "", nil, err
	}

	server := &Server{Listener: l, ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		NotifySessionNewFunc: func(s *ClientConn) {
			fmt.Printf("networkSession start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w *ClientConn, err error) {
			fmt.Printf("networkSession end %v: %v\n", w.RemoteAddr(), err)
		},
		MaxMessageSize: ^uint32(0),
	}

	// fin must be buffered so the goroutine below won't block
	// forever if fin is never read from. This always happens
	// in RunLocalUDPServer and can happen in TestShutdownUDP.
	fin := make(chan error, 1)

	go func() {
		fin <- server.ActivateAndServe()
		l.Close()
	}()

	return server, l.Addr().String(), fin, nil
}

type clientHandler func(t *testing.T, payload []byte, co *ClientConn)

func testServingTCPWithMsgWithObserver(t *testing.T, net string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, payload []byte, ch clientHandler, observeFunc HandlerFunc) {


@@ 219,6 248,24 @@ func testServingTCPWithMsgWithObserver(t *testing.T, net string, BlockWiseTransf
		s, addrstr, fin, err = RunLocalTCPServer(":0", BlockWiseTransfer, BlockWiseTransferSzx)
	case "udp", "udp4", "udp6":
		s, addrstr, fin, err = RunLocalUDPServer(net, ":0", BlockWiseTransfer, BlockWiseTransferSzx)
	case "udp-dtls", "udp4-dtls", "udp6-dtls":
		config := &dtls.Config{
			PSK: func(hint []byte) ([]byte, error) {
				fmt.Printf("Client's hint: %s \n", hint)
				return []byte{0xAB, 0xC1, 0x23}, nil
			},
			PSKIdentityHint: []byte("Pion DTLS Client"),
			CipherSuites:    []dtls.CipherSuiteID{dtls.TLS_PSK_WITH_AES_128_CCM_8},
		}
		c.DTLSConfig = &dtls.Config{
			PSK: func(hint []byte) ([]byte, error) {
				fmt.Printf("Server's hint: %s \n", hint)
				return []byte{0xAB, 0xC1, 0x23}, nil
			},
			PSKIdentityHint: []byte("Pion DTLS Server"),
			CipherSuites:    []dtls.CipherSuiteID{dtls.TLS_PSK_WITH_AES_128_CCM_8},
		}
		s, addrstr, fin, err = RunLocalDTLSServer(":0", config, BlockWiseTransfer, BlockWiseTransferSzx)
	case "tcp-tls", "tcp4-tls", "tcp6-tls":
		cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock)
		if err != nil {


@@ 754,10 801,9 @@ func benchmarkServeTCPStreamWithMsg(b *testing.B, req *TcpMessage) {
func BenchmarkServeTCPStream(b *testing.B) {
	req := &TcpMessage{
		MessageBase{
			typ:       Confirmable,
			code:      POST,
			messageID: 1234,
			payload:   []byte("Content sent by client"),
			typ:     Confirmable,
			code:    POST,
			payload: []byte("Content sent by client"),
		},
	}
	req.SetOption(ContentFormat, TextPlain)


@@ 767,11 813,10 @@ func BenchmarkServeTCPStream(b *testing.B) {

func BenchmarkServeTCPStreamBigMsg(b *testing.B) {
	req := &TcpMessage{
		MessageBase{
			typ:       Confirmable,
			code:      POST,
			messageID: 1234,
			payload:   make([]byte, 1024*1024*10),
		MessageBase: MessageBase{
			typ:     Confirmable,
			code:    POST,
			payload: make([]byte, 1024*1024*10),
		},
	}
	req.SetOption(ContentFormat, TextPlain)

A sessionbase.go => sessionbase.go +159 -0
@@ 0,0 1,159 @@
package coap

import (
	"context"
	"fmt"
	"log"
	"sync"
	"sync/atomic"
	"time"
)

type sessionResp struct {
	ch chan *Request // channel must have size 1 for non-blocking write to channel
}

type sessionBase struct {
	srv      *Server
	handler  *TokenHandler
	sequence uint64

	blockWiseTransfer    bool
	blockWiseTransferSzx uint32                                         //BlockWiseSzx
	mapPairs             map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock         sync.Mutex                                     //to sync add remove token
}

func (s *sessionBase) blockWiseSzx() BlockWiseSzx {
	return BlockWiseSzx(atomic.LoadUint32(&s.blockWiseTransferSzx))
}

func (s *sessionBase) setBlockWiseSzx(szx BlockWiseSzx) {
	atomic.StoreUint32(&s.blockWiseTransferSzx, uint32(szx))
}

func (s *sessionBase) Sequence() uint64 {
	return atomic.AddUint64(&s.sequence, 1)
}

func (s *sessionBase) blockWiseMaxPayloadSize(peer BlockWiseSzx) (int, BlockWiseSzx) {
	szx := s.blockWiseSzx()
	if peer < szx {
		return szxToBytes[peer], peer
	}
	return szxToBytes[szx], szx
}

func (s *sessionBase) TokenHandler() *TokenHandler {
	return s.handler
}

func (s *sessionBase) exchangeFunc(req Message, writeTimeout, readTimeout time.Duration, pairChan *sessionResp, write func(msg Message, timeout time.Duration) error) (Message, error) {

	err := write(req, writeTimeout)
	if err != nil {
		return nil, err
	}

	select {
	case resp := <-pairChan.ch:
		return resp.Msg, nil
	case <-time.After(readTimeout):
		return nil, ErrTimeout
	}
}

func (s *sessionBase) newSessionResp(token []byte, messageID uint16) (*sessionResp, error) {
	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], token)

	//register msgid to token
	pairChan := &sessionResp{make(chan *Request, 1)}
	s.mapPairsLock.Lock()
	defer s.mapPairsLock.Unlock()
	if s.mapPairs[pairToken] == nil {
		s.mapPairs[pairToken] = make(map[uint16]*sessionResp)
	}
	if s.mapPairs[pairToken][messageID] != nil {
		return nil, ErrTokenAlreadyExist
	}
	s.mapPairs[pairToken][messageID] = pairChan
	return pairChan, nil
}

func (s *sessionBase) getSessionResp(token []byte, messageID uint16) *sessionResp {
	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], token)

	s.mapPairsLock.Lock()
	defer s.mapPairsLock.Unlock()
	if m, ok := s.mapPairs[pairToken]; ok {
		if p, ok := m[messageID]; ok {
			return p
		}
	}
	return nil
}

func (s *sessionBase) removeSessionResp(token []byte, messageID uint16) {
	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], token)

	s.mapPairsLock.Lock()
	defer s.mapPairsLock.Unlock()
	delete(s.mapPairs[pairToken], messageID)
	if len(s.mapPairs[pairToken]) == 0 {
		delete(s.mapPairs, pairToken)
	}
}

func (s *sessionBase) exchangeWithContext(ctx context.Context, req Message, writeMsgWithContext func(context.Context, Message) error) (Message, error) {
	if err := validateMsg(req); err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	//register msgid to token
	pairChan, err := s.newSessionResp(req.Token(), req.MessageID())
	if err != nil {
		return nil, err
	}

	defer s.removeSessionResp(req.Token(), req.MessageID())

	err = writeMsgWithContext(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	select {
	case request := <-pairChan.ch:
		return request.Msg, nil
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange: %v", ctx.Err())
		}
		return nil, fmt.Errorf("cannot exchange: cancelled")
	}
}

func validateMsg(msg Message) error {
	if msg.Payload() != nil && msg.Option(ContentFormat) == nil {
		return ErrContentFormatNotSet
	}
	if msg.Payload() == nil && msg.Option(ContentFormat) != nil {
		return ErrInvalidPayload
	}
	return nil
}

func (s *sessionBase) handlePairMsg(w ResponseWriter, r *Request) bool {
	//validate token
	pair := s.getSessionResp(r.Msg.Token(), r.Msg.MessageID())
	if pair != nil {
		select {
		case pair.ch <- r:
		default:
			log.Fatal("Exactly one message can be send to pair. This is second message.")
		}
		return true
	}
	return false
}

A sessiondtls.go => sessiondtls.go +149 -0
@@ 0,0 1,149 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"net"

	coapNet "github.com/go-ocf/go-coap/net"
)

type sessionDTLS struct {
	sessionBase
	connection *coapNet.Conn
}

// newSessionDTLS create new session for DTLS connection
func newSessionDTLS(connection *coapNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}

	if BlockWiseTransfer && BlockWiseTransferSzx == BlockWiseSzxBERT {
		return nil, ErrInvalidBlockWiseSzx
	}

	s := sessionDTLS{
		connection: connection,
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
			mapPairs:             make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
		},
	}

	return &s, nil
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionDTLS) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionDTLS) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

// BlockWiseTransferEnabled
func (s *sessionDTLS) blockWiseEnabled() bool {
	return s.blockWiseTransfer
}

func (s *sessionDTLS) blockWiseIsValid(szx BlockWiseSzx) bool {
	return szx <= BlockWiseSzx1024
}

// Ping send ping over udp(unicast) and wait for response.
func (s *sessionDTLS) PingWithContext(ctx context.Context) error {
	//provoking to get a reset message - "CoAP ping" in RFC-7252
	//https://tools.ietf.org/html/rfc7252#section-4.2
	//https://tools.ietf.org/html/rfc7252#section-4.3
	//https://tools.ietf.org/html/rfc7252#section-1.2 "Reset Message"
	// BUG of iotivity: https://jira.iotivity.org/browse/IOT-3149
	req := s.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      Empty,
		MessageID: GenerateMessageID(),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Type() == Reset {
		return nil
	}
	return ErrInvalidResponse
}

func (s *sessionDTLS) closeWithError(err error) error {
	if s.connection != nil {
		c := ClientConn{commander: &ClientCommander{s}}
		s.srv.NotifySessionEndFunc(&c, err)
		e := s.connection.Close()
		//s.connection = nil
		if e == nil {
			e = err
		}
		return e
	}
	return err
}

// Close implements the networkSession.Close method
func (s *sessionDTLS) Close() error {
	return s.closeWithError(nil)
}

// NewMessage Create message for response
func (s *sessionDTLS) NewMessage(p MessageParams) Message {
	return NewDgramMessage(p)
}

// Close implements the networkSession.Close method
func (s *sessionDTLS) IsTCP() bool {
	return false
}

func (s *sessionDTLS) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	return s.exchangeWithContext(ctx, req, s.WriteMsgWithContext)
}

// Write implements the networkSession.Write method.
func (s *sessionDTLS) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to tcp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, buffer.Bytes())
}

func (s *sessionDTLS) sendPong(w ResponseWriter, r *Request) error {
	resp := r.Client.NewMessage(MessageParams{
		Type:      Reset,
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return w.WriteMsgWithContext(r.Ctx, resp)
}

func (s *sessionDTLS) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	// handle of udp ping
	case Empty:
		if r.Msg.Type() == Confirmable && r.Msg.AllOptions().Len() == 0 && (r.Msg.Payload() == nil || len(r.Msg.Payload()) == 0) {
			s.sendPong(w, r)
			return true
		}
	}
	return false
}

A sessiontcp.go => sessiontcp.go +254 -0
@@ 0,0 1,254 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"net"
	"sync/atomic"

	coapNet "github.com/go-ocf/go-coap/net"
)

type sessionTCP struct {
	sessionBase
	connection *coapNet.Conn

	peerBlockWiseTransfer uint32
	peerMaxMessageSize    uint32
}

// newSessionTCP create new session for TCP connection
func newSessionTCP(connection *coapNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockWiseSzxBERT
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}
	s := &sessionTCP{
		peerMaxMessageSize: uint32(srv.MaxMessageSize),
		connection:         connection,
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
			mapPairs:             make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
		},
	}

	if !s.srv.DisableTCPSignalMessages {
		if err := s.sendCSM(); err != nil {
			return nil, err
		}
	}

	return s, nil
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionTCP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionTCP) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

func (s *sessionTCP) blockWiseEnabled() bool {
	return s.blockWiseTransfer /*&& atomic.LoadUint32(&s.peerBlockWiseTransfer) != 0*/
}

func (s *sessionTCP) blockWiseMaxPayloadSize(peer BlockWiseSzx) (int, BlockWiseSzx) {
	szx := s.blockWiseSzx()
	if szx == BlockWiseSzxBERT && peer == BlockWiseSzxBERT {
		m := atomic.LoadUint32(&s.peerMaxMessageSize)
		if m == 0 {
			m = uint32(s.srv.MaxMessageSize)
		}
		return int(m - (m % 1024)), BlockWiseSzxBERT
	}
	return s.sessionBase.blockWiseMaxPayloadSize(peer)
}

func (s *sessionTCP) blockWiseIsValid(szx BlockWiseSzx) bool {
	return true
}

func (s *sessionTCP) PingWithContext(ctx context.Context) error {
	if s.srv.DisableTCPSignalMessages {
		return fmt.Errorf("cannot send ping: TCP Signal messages are disabled")
	}
	token, err := GenerateToken()
	if err != nil {
		return err
	}
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  Ping,
		Token: []byte(token),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Code() == Pong {
		return nil
	}
	return ErrInvalidResponse
}

func (s *sessionTCP) closeWithError(err error) error {
	if s.connection != nil {
		c := ClientConn{commander: &ClientCommander{s}}
		s.srv.NotifySessionEndFunc(&c, err)
		e := s.connection.Close()
		//s.connection = nil
		if e == nil {
			e = err
		}
		return e
	}
	return err
}

// Close implements the networkSession.Close method
func (s *sessionTCP) Close() error {
	return s.closeWithError(nil)
}

// NewMessage Create message for response
func (s *sessionTCP) NewMessage(p MessageParams) Message {
	return NewTcpMessage(p)
}

// Close implements the networkSession.Close method
func (s *sessionTCP) IsTCP() bool {
	return true
}

func (s *sessionTCP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	if req.Token() == nil {
		return nil, ErrTokenNotExist
	}
	return s.exchangeWithContext(ctx, req, s.WriteMsgWithContext)
}

func (s *sessionTCP) validateMessageSize(msg Message) error {
	size, err := msg.ToBytesLength()
	if err != nil {
		return err
	}

	max := atomic.LoadUint32(&s.peerMaxMessageSize)
	if max != 0 && uint32(size) > max {
		return ErrMaxMessageSizeLimitExceeded
	}

	return nil
}

// Write implements the networkSession.Write method.
func (s *sessionTCP) WriteMsgWithContext(ctx context.Context, req Message) error {
	if err := s.validateMessageSize(req); err != nil {
		return err
	}
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to tcp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, buffer.Bytes())
}

func (s *sessionTCP) sendCSM() error {
	token, err := GenerateToken()
	if err != nil {
		return err
	}
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  CSM,
		Token: []byte(token),
	})
	if s.srv.MaxMessageSize != 0 {
		req.AddOption(MaxMessageSize, uint32(s.srv.MaxMessageSize))
	}
	if s.blockWiseEnabled() {
		req.AddOption(BlockWiseTransfer, []byte{})
	}
	return s.WriteMsgWithContext(context.Background(), req)
}

func (s *sessionTCP) setPeerMaxMessageSize(val uint32) {
	atomic.StoreUint32(&s.peerMaxMessageSize, val)
}

func (s *sessionTCP) setPeerBlockWiseTransfer(val bool) {
	v := uint32(0)
	if val {
		v = 1
	}
	atomic.StoreUint32(&s.peerBlockWiseTransfer, v)
}

func (s *sessionTCP) sendPong(w ResponseWriter, r *Request) error {
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  Pong,
		Token: r.Msg.Token(),
	})
	return w.WriteMsgWithContext(r.Ctx, req)
}

func (s *sessionTCP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	case CSM:
		maxmsgsize := uint32(maxMessageSize)
		if size, ok := r.Msg.Option(MaxMessageSize).(uint32); ok {
			s.setPeerMaxMessageSize(size)
			maxmsgsize = size
		}
		if r.Msg.Option(BlockWiseTransfer) != nil {
			s.setPeerBlockWiseTransfer(true)
			startIter := s.blockWiseSzx()
			if startIter == BlockWiseSzxBERT {
				if szxToBytes[BlockWiseSzx1024] < int(maxmsgsize) {
					s.setBlockWiseSzx(BlockWiseSzxBERT)
					return true
				}
				startIter = BlockWiseSzx512
			}
			for i := startIter; i > BlockWiseSzx16; i-- {
				if szxToBytes[i] < int(maxmsgsize) {
					s.setBlockWiseSzx(i)
					return true
				}
			}
			s.setBlockWiseSzx(BlockWiseSzx16)
		}

		return true
	case Ping:
		if r.Msg.Option(Custody) != nil {
			//TODO
		}
		s.sendPong(w, r)
		return true
	case Release:
		if _, ok := r.Msg.Option(AlternativeAddress).(string); ok {
			//TODO
		}
		return true
	case Abort:
		if _, ok := r.Msg.Option(BadCSMOption).(uint32); ok {
			//TODO
		}
		return true
	}
	return false
}

A sessionudp.go => sessionudp.go +153 -0
@@ 0,0 1,153 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"net"

	coapNet "github.com/go-ocf/go-coap/net"
)

type connUDP interface {
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
	ReadWithContext(ctx context.Context, buffer []byte) (int, *coapNet.ConnUDPContext, error)
	WriteWithContext(ctx context.Context, udpCtx *coapNet.ConnUDPContext, buffer []byte) error
}

type sessionUDP struct {
	sessionBase
	connection     connUDP
	sessionUDPData *coapNet.ConnUDPContext // oob data to get egress interface right
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}

	if BlockWiseTransfer && BlockWiseTransferSzx == BlockWiseSzxBERT {
		return nil, ErrInvalidBlockWiseSzx
	}

	s := &sessionUDP{
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
			mapPairs:             make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
		},
		connection:     connection,
		sessionUDPData: sessionUDPData,
	}
	return s, nil
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionUDP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionUDP) RemoteAddr() net.Addr {
	return s.sessionUDPData.RemoteAddr()
}

// BlockWiseTransferEnabled
func (s *sessionUDP) blockWiseEnabled() bool {
	return s.blockWiseTransfer
}

func (s *sessionUDP) blockWiseIsValid(szx BlockWiseSzx) bool {
	return szx <= BlockWiseSzx1024
}

func (s *sessionUDP) closeWithError(err error) error {
	s.srv.sessionUDPMapLock.Lock()
	delete(s.srv.sessionUDPMap, s.sessionUDPData.Key())
	s.srv.sessionUDPMapLock.Unlock()
	c := ClientConn{commander: &ClientCommander{s}}
	s.srv.NotifySessionEndFunc(&c, err)

	return err
}

// Ping send ping over udp(unicast) and wait for response.
func (s *sessionUDP) PingWithContext(ctx context.Context) error {
	//provoking to get a reset message - "CoAP ping" in RFC-7252
	//https://tools.ietf.org/html/rfc7252#section-4.2
	//https://tools.ietf.org/html/rfc7252#section-4.3
	//https://tools.ietf.org/html/rfc7252#section-1.2 "Reset Message"
	// BUG of iotivity: https://jira.iotivity.org/browse/IOT-3149
	req := s.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      Empty,
		MessageID: GenerateMessageID(),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Type() == Reset {
		return nil
	}
	return ErrInvalidResponse
}

// Close implements the networkSession.Close method
func (s *sessionUDP) Close() error {
	return s.closeWithError(nil)
}

// NewMessage Create message for response
func (s *sessionUDP) NewMessage(p MessageParams) Message {
	return NewDgramMessage(p)
}

// Close implements the networkSession.Close method
func (s *sessionUDP) IsTCP() bool {
	return false
}

func (s *sessionUDP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	return s.exchangeWithContext(ctx, req, s.WriteMsgWithContext)
}

func (s *sessionUDP) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to udp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, s.sessionUDPData, buffer.Bytes())
}

func (s *sessionUDP) sendPong(w ResponseWriter, r *Request) error {
	resp := r.Client.NewMessage(MessageParams{
		Type:      Reset,
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return w.WriteMsgWithContext(r.Ctx, resp)
}

func (s *sessionUDP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	// handle of udp ping
	case Empty:
		if r.Msg.Type() == Confirmable && r.Msg.AllOptions().Len() == 0 && (r.Msg.Payload() == nil || len(r.Msg.Payload()) == 0) {
			s.sendPong(w, r)
			return true
		}
	}
	return false
}