~fnux/yggdrasil-go-coap

5ec6279dafe4e1925b59f2bef2ce7810b6236970 — Jozef Kralik 2 years ago c5cc5a5
Provide context to Request (#36)

* implement context
* fix dealock when client connection was closed by server
M .gitignore => .gitignore +1 -0
@@ 9,6 9,7 @@ server
!server/
client
!client/
vendor/

# Test binary, build with `go test -c`
*.test

M .travis.yml => .travis.yml +22 -15
@@ 1,23 1,30 @@
language: go

go:
 - 1.10.x
language: minimal

before_script:
  # Add an IPv6 config - see the corresponding Travis issue
  # https://github.com/travis-ci/travis-ci/issues/8361
services:
  - docker

install:                                                                 
  - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then
      sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6';
    fi
  - docker build . --network=host -t go-coap:build --target build

script:
 - GOOS=linux go build
 - GOOS=darwin go build
 - GOOS=freebsd go build
# - GOOS=windows go build

 - go test -v -coverprofile=coverage.txt -covermode=atomic
 - go test -v -race
jobs:
  include:
    - stage: test
      if: type == pull_request
      script: 
        - docker run --network=host go-coap:build go test ./...

after_success:
 - bash <(curl -s https://codecov.io/bash)
    - stage: test_and_cover
      if: type != pull_request
      script: 
        - >
            docker run
            --mount type=bind,source="$(pwd)",target=/shared
            --network=host
            go-coap:build
            go test ./... -covermode=atomic -coverprofile=/shared/coverage.txt
        - bash <(curl -s https://codecov.io/bash)

A Dockerfile => Dockerfile +11 -0
@@ 0,0 1,11 @@
FROM golang:1.11-alpine3.8 AS build

RUN apk add --no-cache curl git build-base && \
	curl -SL -o /usr/bin/dep https://github.com/golang/dep/releases/download/v0.5.0/dep-linux-amd64 && \
	chmod +x /usr/bin/dep

ENV MAINDIR $GOPATH/src/github.com/go-ocf/go-coap
WORKDIR $MAINDIR
COPY Gopkg.toml Gopkg.lock ./
RUN dep ensure -v --vendor-only
COPY . .
\ No newline at end of file

A Gopkg.lock => Gopkg.lock +90 -0
@@ 0,0 1,90 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.


[[projects]]
  branch = "master"
  digest = "1:25c92b5a7b89547608053ea7048469c173ede8d976cdfc861b8b39ca9852cad9"
  name = "github.com/go-ocf/kit"
  packages = [
    "log",
    "net",
  ]
  pruneopts = "UT"
  revision = "8560f72fc720c9710c19cb6814495ab67271da5c"

[[projects]]
  digest = "1:4887e9e89c80299aa520d718239809fdd2a47a9aa394909b169959bfbc424ddf"
  name = "github.com/ugorji/go"
  packages = ["codec"]
  pruneopts = "UT"
  revision = "8fd0f8d918c8f0b52d0af210a812ba882cc31a1e"
  version = "v1.1.2"

[[projects]]
  digest = "1:3c1a69cdae3501bf75e76d0d86dc6f2b0a7421bc205c0cb7b96b19eed464a34d"
  name = "go.uber.org/atomic"
  packages = ["."]
  pruneopts = "UT"
  revision = "1ea20fb1cbb1cc08cbd0d913a96dead89aa18289"
  version = "v1.3.2"

[[projects]]
  branch = "master"
  digest = "1:915aa54691f6897cc74afa0da29eaf6e93202831b4e980c56645d99c708bc5b1"
  name = "go.uber.org/goleak"
  packages = [
    ".",
    "internal/stack",
  ]
  pruneopts = "UT"
  revision = "c82e52b9ed06070186646c19bdceeae9dc18ec5d"

[[projects]]
  digest = "1:60bf2a5e347af463c42ed31a493d817f8a72f102543060ed992754e689805d1a"
  name = "go.uber.org/multierr"
  packages = ["."]
  pruneopts = "UT"
  revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a"
  version = "v1.1.0"

[[projects]]
  digest = "1:c52caf7bd44f92e54627a31b85baf06a68333a196b3d8d241480a774733dcf8b"
  name = "go.uber.org/zap"
  packages = [
    ".",
    "buffer",
    "internal/bufferpool",
    "internal/color",
    "internal/exit",
    "zapcore",
  ]
  pruneopts = "UT"
  revision = "ff33455a0e382e8a81d14dd7c922020b6b5e7982"
  version = "v1.9.1"

[[projects]]
  branch = "master"
  digest = "1:425d81a8ef644b5cd324106e8d6e355581a47c2222b20b938822470a3c96a5e1"
  name = "golang.org/x/net"
  packages = [
    "bpf",
    "internal/iana",
    "internal/socket",
    "ipv4",
    "ipv6",
  ]
  pruneopts = "UT"
  revision = "16b79f2e4e95ea23b2bf9903c9809ff7b013ce85"

[solve-meta]
  analyzer-name = "dep"
  analyzer-version = 1
  input-imports = [
    "github.com/go-ocf/kit/net",
    "github.com/ugorji/go/codec",
    "go.uber.org/goleak",
    "golang.org/x/net/ipv4",
    "golang.org/x/net/ipv6",
  ]
  solver-name = "gps-cdcl"
  solver-version = 1

A Gopkg.toml => Gopkg.toml +46 -0
@@ 0,0 1,46 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
#   name = "github.com/user/project"
#   version = "1.0.0"
#
# [[constraint]]
#   name = "github.com/user/project2"
#   branch = "dev"
#   source = "github.com/myfork/project2"
#
# [[override]]
#   name = "github.com/x/y"
#   version = "2.4.0"
#
# [prune]
#   non-go = false
#   go-tests = true
#   unused-packages = true


[[constraint]]
  branch = "master"
  name = "github.com/go-ocf/kit"

[[constraint]]
  name = "github.com/ugorji/go"
  version = "1.1.2"

[[constraint]]
  branch = "master"
  name = "go.uber.org/goleak"

[[constraint]]
  branch = "master"
  name = "golang.org/x/net"

[prune]
  go-tests = true
  unused-packages = true

M blockwise.go => blockwise.go +136 -57
@@ 2,9 2,9 @@ package coap

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"time"
)

const (


@@ 81,7 81,7 @@ func UnmarshalBlockOption(blockVal uint32) (szx BlockWiseSzx, blockNumber uint, 
	return
}

func exchangeDrivedByPeer(session networkSession, req Message, blockType OptionID) (Message, error) {
func exchangeDrivedByPeer(ctx context.Context, session networkSession, req Message, blockType OptionID) (Message, error) {
	if block, ok := req.Option(blockType).(uint32); ok {
		_, _, more, err := UnmarshalBlockOption(block)
		if err != nil {


@@ 89,7 89,7 @@ func exchangeDrivedByPeer(session networkSession, req Message, blockType OptionI
		}
		if more == false {
			// we send all datas to peer -> create empty response
			err := session.WriteMsg(req)
			err := session.WriteMsgWithContext(ctx, req)
			if err != nil {
				return nil, err
			}


@@ 106,14 106,17 @@ func exchangeDrivedByPeer(session networkSession, req Message, blockType OptionI
		}
	})
	defer session.TokenHandler().Remove(req.Token())
	err := session.WriteMsg(req)
	err := session.WriteMsgWithContext(ctx, req)
	if err != nil {
		return nil, err
	}
	select {
	case resp := <-pair:
		return resp.Msg, nil
	case <-time.After(session.ReadDeadline()):
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange drived by peer: %v", err)
		}
		return nil, ErrTimeout
	}
}


@@ 188,16 191,16 @@ func (s *blockWiseSender) newReq(b *blockWiseSession) (Message, error) {
	return req, nil
}

func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, error) {
func (s *blockWiseSender) exchange(ctx context.Context, b *blockWiseSession, req Message) (Message, error) {
	var resp Message
	var err error
	if blockWiseDebug {
		log.Printf("sendPayload %p req=%v\n", b, req)
	}
	if s.peerDrive {
		resp, err = exchangeDrivedByPeer(b.networkSession, req, s.blockType)
		resp, err = exchangeDrivedByPeer(ctx, b.networkSession, req, s.blockType)
	} else {
		resp, err = b.networkSession.Exchange(req)
		resp, err = b.networkSession.ExchangeWithContext(ctx, req)
	}
	if err != nil {
		return nil, err


@@ 208,7 211,7 @@ func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, e
	return resp, nil
}

func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Message) (Message, error) {
func (s *blockWiseSender) processResp(ctx context.Context, b *blockWiseSession, req Message, resp Message) (Message, error) {
	if s.currentMore == false {
		if s.blockType == Block1 {
			if respBlock2, ok := resp.Option(Block2).(uint32); ok {


@@ 221,7 224,7 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
				}
				if num == 0 {
					resp.RemoveOption(s.sizeType())
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code())
					return b.receivePayload(ctx, s.peerDrive, s.origin, resp, Block2, s.origin.Code())
				}
			}
		}


@@ 287,19 290,19 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
	return nil, nil
}

func (b *blockWiseSession) sendPayload(peerDrive bool, blockType OptionID, suggestedSzx BlockWiseSzx, expectedCode COAPCode, msg Message) (Message, error) {
func (b *blockWiseSession) sendPayload(ctx context.Context, peerDrive bool, blockType OptionID, suggestedSzx BlockWiseSzx, expectedCode COAPCode, msg Message) (Message, error) {
	s := newSender(peerDrive, blockType, suggestedSzx, expectedCode, msg)
	req, err := s.newReq(b)
	if err != nil {
		return nil, err
	}
	for {
		bwResp, err := s.exchange(b, req)
		bwResp, err := s.exchange(ctx, b, req)
		if err != nil {
			return nil, err
		}

		resp, err := s.processResp(b, req, bwResp)
		resp, err := s.processResp(ctx, b, req, bwResp)
		if err != nil {
			return nil, err
		}


@@ 315,27 318,35 @@ type blockWiseSession struct {
}

func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
	return b.ExchangeWithContext(context.Background(), msg)
}

func (b *blockWiseSession) ExchangeWithContext(ctx context.Context, msg Message) (Message, error) {
	switch msg.Code() {
	//these methods doesn't need to be handled by blockwise
	case CSM, Ping, Pong, Release, Abort, Empty:
		return b.networkSession.Exchange(msg)
		return b.networkSession.ExchangeWithContext(ctx, msg)
	case GET, DELETE:
		return b.receivePayload(false, msg, nil, Block2, msg.Code())
		return b.receivePayload(ctx, false, msg, nil, Block2, msg.Code())
	case POST, PUT:
		return b.sendPayload(false, Block1, b.networkSession.blockWiseSzx(), Continue, msg)
		return b.sendPayload(ctx, false, Block1, b.networkSession.blockWiseSzx(), Continue, msg)
	// for response code
	default:
		return b.sendPayload(true, Block2, b.networkSession.blockWiseSzx(), Continue, msg)
		return b.sendPayload(ctx, true, Block2, b.networkSession.blockWiseSzx(), Continue, msg)
	}

}

func (b *blockWiseSession) WriteMsg(msg Message) error {
	return b.WriteMsgWithContext(context.Background(), msg)
}

func (b *blockWiseSession) WriteMsgWithContext(ctx context.Context, msg Message) error {
	switch msg.Code() {
	case CSM, Ping, Pong, Release, Abort, Empty, GET:
		return b.networkSession.WriteMsg(msg)
		return b.networkSession.WriteMsgWithContext(ctx, msg)
	default:
		_, err := b.Exchange(msg)
		_, err := b.ExchangeWithContext(ctx, msg)
		return err
	}
}


@@ 352,7 363,7 @@ func calcStartOffset(num uint, szx BlockWiseSzx) int {
	return int(num) * szxToBytes[szx]
}

func (b *blockWiseSession) sendErrorMsg(code COAPCode, typ COAPType, token []byte, MessageID uint16, err error) {
func (b *blockWiseSession) sendErrorMsg(ctx context.Context, code COAPCode, typ COAPType, token []byte, MessageID uint16, err error) {
	req := b.NewMessage(MessageParams{
		Code:      code,
		Type:      typ,


@@ 363,7 374,7 @@ func (b *blockWiseSession) sendErrorMsg(code COAPCode, typ COAPType, token []byt
		req.SetOption(ContentFormat, TextPlain)
		req.SetPayload([]byte(err.Error()))
	}
	b.networkSession.WriteMsg(req)
	b.networkSession.WriteMsgWithContext(ctx, req)
}

type blockWiseReceiver struct {


@@ 502,16 513,16 @@ func newReceiver(b *blockWiseSession, peerDrive bool, origin Message, resp Messa
	return r, nil, nil
}

func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message, error) {
func (r *blockWiseReceiver) exchange(ctx context.Context, b *blockWiseSession, req Message) (Message, error) {
	if blockWiseDebug {
		log.Printf("receivePayload %p req=%v\n", b, req)
	}
	var resp Message
	var err error
	if r.peerDrive {
		resp, err = exchangeDrivedByPeer(b.networkSession, req, r.blockType)
		resp, err = exchangeDrivedByPeer(ctx, b.networkSession, req, r.blockType)
	} else {
		resp, err = b.networkSession.Exchange(req)
		resp, err = b.networkSession.ExchangeWithContext(ctx, req)
	}

	if blockWiseDebug {


@@ 594,7 605,7 @@ func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp M
	return nil, nil
}

func (r *blockWiseReceiver) sendError(b *blockWiseSession, code COAPCode, resp Message, err error) {
func (r *blockWiseReceiver) sendError(ctx context.Context, b *blockWiseSession, code COAPCode, resp Message, err error) {
	var MessageID uint16
	var token []byte
	var typ COAPType


@@ 611,13 622,13 @@ func (r *blockWiseReceiver) sendError(b *blockWiseSession, code COAPCode, resp M
			token = r.origin.Token()
		}
	}
	b.sendErrorMsg(code, typ, token, MessageID, err)
	b.sendErrorMsg(ctx, code, typ, token, MessageID, err)
}

func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode) (Message, error) {
func (b *blockWiseSession) receivePayload(ctx context.Context, peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode) (Message, error) {
	r, resp, err := newReceiver(b, peerDrive, msg, resp, blockType, code)
	if err != nil {
		r.sendError(b, BadRequest, resp, err)
		r.sendError(ctx, b, BadRequest, resp, err)
		return nil, err
	}
	if resp != nil {


@@ 626,15 637,15 @@ func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Mess

	req, err := r.newReq(b, resp)
	if err != nil {
		r.sendError(b, BadRequest, resp, err)
		r.sendError(ctx, b, BadRequest, resp, err)
		return nil, err
	}

	for {
		bwResp, err := r.exchange(b, req)
		bwResp, err := r.exchange(ctx, b, req)

		if err != nil {
			r.sendError(b, BadRequest, resp, err)
			r.sendError(ctx, b, BadRequest, resp, err)
			return nil, err
		}



@@ 646,7 657,7 @@ func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Mess
			case ErrRequestEntityIncomplete:
				errCode = RequestEntityIncomplete
			}
			r.sendError(b, errCode, resp, err)
			r.sendError(ctx, b, errCode, resp, err)
			return nil, err
		}



@@ 664,7 675,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
		switch r.Msg.Code() {
		case PUT, POST:
			if b, ok := r.Client.networkSession.(*blockWiseSession); ok {
				msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue)
				msg, err := b.receivePayload(r.Ctx, true, r.Msg, nil, Block1, Continue)

				if err != nil {
					return


@@ 673,7 684,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
				// We need to be careful to create a new response writer for the
				// new request, otherwise the server may attempt to respond to
				// the wrong request.
				newReq := &Request{Client: r.Client, Msg: msg}
				newReq := &Request{Client: r.Client, Msg: msg, Ctx: r.Ctx}
				newWriter := responseWriterFromRequest(newReq)
				next(newWriter, newReq)
				return


@@ 700,7 711,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
							if err != nil {
								return
							}
							next(w, &Request{networkSession: r.networkSession, Msg: msg})
							next(w, &Request{networkSession: r.networkSession, Msg: msg, Ctx: r.Ctx})
							return
						}
					}*/


@@ 711,20 722,50 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
}

type blockWiseResponseWriter struct {
	*responseWriter
	responseWriter ResponseWriter
}

func (w *blockWiseResponseWriter) NewResponse(code COAPCode) Message {
	return w.responseWriter.NewResponse(code)
}

func (w *blockWiseResponseWriter) SetCode(code COAPCode) {
	w.responseWriter.SetCode(code)
}

func (w *blockWiseResponseWriter) SetContentFormat(contentFormat MediaType) {
	w.responseWriter.SetContentFormat(contentFormat)
}

func (w *blockWiseResponseWriter) getCode() *COAPCode {
	return w.responseWriter.getCode()
}

func (w *blockWiseResponseWriter) getReq() *Request {
	return w.responseWriter.getReq()
}

//Write send whole message if size of payload is less then block szx otherwise
func (w *blockWiseResponseWriter) getContentFormat() *MediaType {
	return w.responseWriter.getContentFormat()
}

//WriteMsg send whole message if size of payload is less then block szx otherwise
//send message via blockwise.
func (w *blockWiseResponseWriter) WriteMsg(msg Message) error {
	suggestedSzx := w.req.Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
	return w.WriteMsgWithContext(context.Background(), msg)
}

//Write send whole message with context if size of payload is less then block szx otherwise
//send message via blockwise.
func (w *blockWiseResponseWriter) WriteMsgWithContext(ctx context.Context, msg Message) error {
	suggestedSzx := w.responseWriter.getReq().Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.responseWriter.getReq().Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockWiseSzxBERT && !w.req.Client.networkSession.IsTCP() {
		if szx == BlockWiseSzxBERT && !w.responseWriter.getReq().Client.networkSession.IsTCP() {
			return ErrInvalidBlockWiseSzx
		}
		suggestedSzx = szx


@@ 732,11 773,11 @@ func (w *blockWiseResponseWriter) WriteMsg(msg Message) error {

	//resp is less them szx then just write msg without blockWise
	if len(msg.Payload()) < szxToBytes[suggestedSzx] {
		return w.responseWriter.WriteMsg(msg)
		return w.responseWriter.WriteMsgWithContext(ctx, msg)
	}

	if b, ok := w.req.Client.networkSession.(*blockWiseSession); ok {
		_, err := b.sendPayload(true, Block2, suggestedSzx, w.req.Msg.Code(), msg)
	if b, ok := w.responseWriter.getReq().Client.networkSession.(*blockWiseSession); ok {
		_, err := b.sendPayload(ctx, true, Block2, suggestedSzx, w.responseWriter.getReq().Msg.Code(), msg)
		return err
	}



@@ 745,27 786,60 @@ func (w *blockWiseResponseWriter) WriteMsg(msg Message) error {

// Write send response to peer
func (w *blockWiseResponseWriter) Write(p []byte) (n int, err error) {
	l, resp := prepareReponse(w, w.responseWriter.req.Msg.Code(), w.responseWriter.code, w.responseWriter.contentFormat, p)
	err = w.WriteMsg(resp)
	return w.WriteWithContext(context.Background(), p)
}

// WriteContext send response with context to peer
func (w *blockWiseResponseWriter) WriteWithContext(ctx context.Context, p []byte) (n int, err error) {
	l, resp := prepareReponse(w, w.responseWriter.getReq().Msg.Code(), w.responseWriter.getCode(), w.responseWriter.getContentFormat(), p)
	err = w.WriteMsgWithContext(ctx, resp)
	return l, err
}

type blockWiseNoticeWriter struct {
	*responseWriter
	responseWriter ResponseWriter
}

func (w *blockWiseNoticeWriter) NewResponse(code COAPCode) Message {
	return w.responseWriter.NewResponse(code)
}

func (w *blockWiseNoticeWriter) SetCode(code COAPCode) {
	w.responseWriter.SetCode(code)
}

func (w *blockWiseNoticeWriter) SetContentFormat(contentFormat MediaType) {
	w.responseWriter.SetContentFormat(contentFormat)
}

func (w *blockWiseNoticeWriter) getCode() *COAPCode {
	return w.responseWriter.getCode()
}

//Write send whole message if size of payload is less then block szx otherwise
func (w *blockWiseNoticeWriter) getReq() *Request {
	return w.responseWriter.getReq()
}

func (w *blockWiseNoticeWriter) getContentFormat() *MediaType {
	return w.responseWriter.getContentFormat()
}

func (w *blockWiseNoticeWriter) WriteMsg(msg Message) error {
	return w.WriteMsgWithContext(context.Background(), msg)
}

//Write send whole message with context. If size of payload is less then block szx otherwise
//send only first block. For Get whole msg client must call Get to
//resource.
func (w *blockWiseNoticeWriter) WriteMsg(msg Message) error {
	suggestedSzx := w.req.Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
func (w *blockWiseNoticeWriter) WriteMsgWithContext(ctx context.Context, msg Message) error {
	suggestedSzx := w.responseWriter.getReq().Client.networkSession.blockWiseSzx()
	if respBlock2, ok := w.responseWriter.getReq().Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockWiseSzxBERT && !w.req.Client.networkSession.IsTCP() {
		if szx == BlockWiseSzxBERT && !w.responseWriter.getReq().Client.networkSession.IsTCP() {
			return ErrInvalidBlockWiseSzx
		}
		suggestedSzx = szx


@@ 773,23 847,28 @@ func (w *blockWiseNoticeWriter) WriteMsg(msg Message) error {

	//resp is less them szx then just write msg without blockWise
	if len(msg.Payload()) < szxToBytes[suggestedSzx] {
		return w.responseWriter.WriteMsg(msg)
		return w.responseWriter.WriteMsgWithContext(ctx, msg)
	}

	if b, ok := w.req.Client.networkSession.(*blockWiseSession); ok {
		s := newSender(false, Block2, suggestedSzx, w.req.Msg.Code(), msg)
	if b, ok := w.responseWriter.getReq().Client.networkSession.(*blockWiseSession); ok {
		s := newSender(false, Block2, suggestedSzx, w.responseWriter.getReq().Msg.Code(), msg)
		req, err := s.newReq(b)
		if err != nil {
			return err
		}
		return b.networkSession.WriteMsg(req)
		return b.networkSession.WriteMsgWithContext(ctx, req)
	}
	return ErrNotSupported
}

// Write send response to peer
func (w *blockWiseNoticeWriter) Write(p []byte) (n int, err error) {
	l, resp := prepareReponse(w, w.responseWriter.req.Msg.Code(), w.responseWriter.code, w.responseWriter.contentFormat, p)
	err = w.WriteMsg(resp)
	return w.WriteWithContext(context.Background(), p)
}

// Write send response with context to peer
func (w *blockWiseNoticeWriter) WriteWithContext(ctx context.Context, p []byte) (n int, err error) {
	l, resp := prepareReponse(w, w.responseWriter.getReq().Msg.Code(), w.responseWriter.getCode(), w.responseWriter.getContentFormat(), p)
	err = w.WriteMsgWithContext(ctx, resp)
	return l, err
}

M blockwise_test.go => blockwise_test.go +3 -0
@@ 5,6 5,8 @@ import (
	"fmt"
	"log"
	"testing"

	"go.uber.org/goleak"
)

func testMarshal(t *testing.T, szx BlockWiseSzx, blockNumber uint, moreBlocksFollowing bool, expectedBlock uint32) {


@@ 83,6 85,7 @@ func TestBlockWiseBlockUnmarshal(t *testing.T) {
}

func TestServingUDPBlockWiseSzx16(t *testing.T) {
	defer goleak.VerifyNone(t)
	testServingTCPWithMsg(t, "udp", true, BlockWiseSzx16, make([]byte, 128), simpleMsg)
}


M client.go => client.go +76 -77
@@ 3,12 3,14 @@ package coap
// A client implementation.

import (
	"context"
	"crypto/tls"
	"io"
	"log"
	"net"
	"strings"
	"time"

	kitNet "github.com/go-ocf/kit/net"
)

// A ClientConn represents a connection to a COAP server.


@@ 28,7 30,7 @@ type Client struct {
	DialTimeout    time.Duration // set Timeout for dialer
	ReadTimeout    time.Duration // net.ClientConn.SetReadTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	WriteTimeout   time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	SyncTimeout    time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal
	HeartBeat      time.Duration // Defines wake up interval from operations Read, Write over connection. defaults is 100ms.

	Handler              HandlerFunc     // default handler for handling messages from server
	NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.


@@ 53,13 55,6 @@ func (c *Client) writeTimeout() time.Duration {
	return coapTimeout
}

func (c *Client) syncTimeout() time.Duration {
	if c.SyncTimeout != 0 {
		return c.SyncTimeout
	}
	return syncTimeout
}

func listenUDP(network, address string) (*net.UDPAddr, *net.UDPConn, error) {
	var a *net.UDPAddr
	var err error


@@ 70,18 65,22 @@ func listenUDP(network, address string) (*net.UDPAddr, *net.UDPConn, error) {
	if udpConn, err = net.ListenUDP(network, a); err != nil {
		return nil, nil, err
	}
	if err := setUDPSocketOptions(udpConn); err != nil {
	if err := kitNet.SetUDPSocketOptions(udpConn); err != nil {
		return nil, nil, err
	}
	return a, udpConn, nil
}

// Dial connects to the address on the named network.
func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	return c.DialWithContext(context.Background(), address)
}

// DialContext connects to the address on the named network.
func (c *Client) DialWithContext(ctx context.Context, address string) (clientConn *ClientConn, err error) {

	var conn net.Conn
	var network string
	var sessionUPDData *SessionUDPData
	var sessionUPDData *kitNet.ConnUDPContext

	dialer := &net.Dialer{Timeout: c.DialTimeout}
	BlockWiseTransfer := false


@@ 98,7 97,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		BlockWiseTransferSzx = BlockWiseSzxBERT
	case "tcp", "tcp4", "tcp6":
		network = c.Net
		conn, err = dialer.Dial(c.Net, address)
		conn, err = dialer.DialContext(ctx, c.Net, address)
		if err != nil {
			return nil, err
		}


@@ 108,10 107,10 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		if network == "" {
			network = "udp"
		}
		if conn, err = dialer.Dial(network, address); err != nil {
		if conn, err = dialer.DialContext(ctx, network, address); err != nil {
			return nil, err
		}
		sessionUPDData = &SessionUDPData{raddr: conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr)}
		sessionUPDData = kitNet.NewConnUDPWithContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		BlockWiseTransfer = true
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		network = strings.TrimSuffix(c.Net, "-mcast")


@@ 119,7 118,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		if err != nil {
			return nil, err
		}
		sessionUPDData = &SessionUDPData{raddr: a}
		sessionUPDData = kitNet.NewConnUDPWithContext(a, nil)
		conn = udpConn
		BlockWiseTransfer = true
		multicast = true


@@ 135,7 134,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		BlockWiseTransferSzx = *c.BlockWiseTransferSzx
	}

	sync := make(chan bool)
	//sync := make(chan bool)
	clientConn = &ClientConn{
		srv: &Server{
			Net:                      network,


@@ 147,23 146,15 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			BlockWiseTransfer:        &BlockWiseTransfer,
			BlockWiseTransferSzx:     &BlockWiseTransferSzx,
			DisableTCPSignalMessages: c.DisableTCPSignalMessages,
			NotifyStartedFunc: func() {
				timeout := c.syncTimeout()
				select {
				case sync <- true:
				case <-time.After(timeout):
					log.Fatal("Client cannot send start: Timeout")
				}
			},
			NotifySessionEndFunc: func(s *ClientCommander, err error) {
				if c.NotifySessionEndFunc != nil {
					c.NotifySessionEndFunc(err)
				}
			},
			newSessionTCPFunc: func(connection Conn, srv *Server) (networkSession, error) {
			newSessionTCPFunc: func(connection *kitNet.Conn, srv *Server) (networkSession, error) {
				return clientConn.commander.networkSession, nil
			},
			newSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {
			newSessionUDPFunc: func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.commander.networkSession.RemoteAddr().String() {
					if s, ok := clientConn.commander.networkSession.(*blockWiseSession); ok {
						s.networkSession.(*sessionUDP).sessionUDPData = sessionUDPData


@@ 183,14 174,14 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			},
			Handler: c.Handler,
		},
		shutdownSync: make(chan error),
		shutdownSync: make(chan error, 1),
		multicast:    multicast,
		commander:    &ClientCommander{},
	}

	switch clientConn.srv.Conn.(type) {
	case *net.TCPConn, *tls.Conn:
		session, err := newSessionTCP(newConnectionTCP(clientConn.srv.Conn, clientConn.srv), clientConn.srv)
		session, err := newSessionTCP(kitNet.NewConn(clientConn.srv.Conn, clientConn.srv.heartBeat()), clientConn.srv)
		if err != nil {
			return nil, err
		}


@@ 200,9 191,9 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			clientConn.commander.networkSession = session
		}
	case *net.UDPConn:
		// WriteMsgUDP returns error when addr is filled in SessionUDPData for connected socket
		setUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(newConnectionUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv), clientConn.srv, sessionUPDData)
		// WriteContextMsgUDP returns error when addr is filled in SessionUDPData for connected socket
		kitNet.SetUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(kitNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat()), clientConn.srv, sessionUPDData)
		if err != nil {
			return nil, err
		}


@@ 213,25 204,12 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		}
	}

	clientConn.commander.networkSession.SetReadDeadline(c.readTimeout())
	clientConn.commander.networkSession.SetWriteDeadline(c.writeTimeout())

	go func() {
		timeout := c.syncTimeout()
		err := clientConn.srv.ActivateAndServe()
		select {
		case clientConn.shutdownSync <- err:
		case <-time.After(timeout):
			log.Fatal("Client cannot send shutdown: Timeout")
		}
	}()

	select {
	case <-sync:
	case <-time.After(c.syncTimeout()):
		log.Fatal("Client cannot recv start: Timeout")
	}

	clientConn.client = c

	return clientConn, nil


@@ 247,18 225,22 @@ func (co *ClientConn) RemoteAddr() net.Addr {
	return co.commander.RemoteAddr()
}

// Exchange performs a synchronous query. It sends the message m to the address
func (co *ClientConn) Exchange(m Message) (Message, error) {
	return co.commander.ExchangeWithContext(context.Background(), m)
}

// ExchangeContext performs a synchronous query. It sends the message m to the address
// contained in a and waits for a reply.
//
// Exchange does not retry a failed query, nor will it fall back to TCP in
// ExchangeContext does not retry a failed query, nor will it fall back to TCP in
// case of truncation.
// To specify a local address or a timeout, the caller has to set the `Client.Dialer`
// attribute appropriately
func (co *ClientConn) Exchange(m Message) (Message, error) {
func (co *ClientConn) ExchangeWithContext(ctx context.Context, m Message) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Exchange(m)
	return co.commander.ExchangeWithContext(ctx, m)
}

// NewMessage Create message for request


@@ 286,86 268,103 @@ func (co *ClientConn) NewDeleteRequest(path string) (Message, error) {
	return co.commander.NewDeleteRequest(path)
}

// Write sends direct a message through the connection
func (co *ClientConn) WriteMsg(m Message) error {
	return co.commander.WriteMsg(m)
	return co.commander.WriteMsgWithContext(context.Background(), m)
}

// SetReadDeadline set read deadline for timeout for Exchange
func (co *ClientConn) SetReadDeadline(timeout time.Duration) {
	co.commander.networkSession.SetReadDeadline(timeout)
// WriteContextMsg sends direct a message through the connection
func (co *ClientConn) WriteMsgWithContext(ctx context.Context, m Message) error {
	return co.commander.WriteMsgWithContext(ctx, m)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (co *ClientConn) SetWriteDeadline(timeout time.Duration) {
	co.commander.networkSession.SetWriteDeadline(timeout)
// Ping send a ping message and wait for a pong response
func (co *ClientConn) Ping(timeout time.Duration) error {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	return co.PingWithContext(ctx)
}

// Ping send a ping message and wait for a pong response
func (co *ClientConn) Ping(timeout time.Duration) error {
	return co.commander.Ping(timeout)
func (co *ClientConn) PingWithContext(ctx context.Context) error {
	return co.commander.PingWithContext(ctx)
}

// Get retrieve the resource identified by the request path
// GetContext retrieve the resource identified by the request path
func (co *ClientConn) Get(path string) (Message, error) {
	return co.GetWithContext(context.Background(), path)
}

func (co *ClientConn) GetWithContext(ctx context.Context, path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Get(path)
	return co.commander.GetWithContext(ctx, path)
}

// Post update the resource identified by the request path
func (co *ClientConn) Post(path string, contentFormat MediaType, body io.Reader) (Message, error) {
	return co.PostWithContext(context.Background(), path, contentFormat, body)
}

// Post update the resource identified by the request path
func (co *ClientConn) PostWithContext(ctx context.Context, path string, contentFormat MediaType, body io.Reader) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Post(path, contentFormat, body)
	return co.commander.PostWithContext(ctx, path, contentFormat, body)
}

// Put create the resource identified by the request path
func (co *ClientConn) Put(path string, contentFormat MediaType, body io.Reader) (Message, error) {
	return co.PutWithContext(context.Background(), path, contentFormat, body)
}

// PutContext create the resource identified by the request path
func (co *ClientConn) PutWithContext(ctx context.Context, path string, contentFormat MediaType, body io.Reader) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Put(path, contentFormat, body)
	return co.commander.PutWithContext(ctx, path, contentFormat, body)
}

// Delete delete the resource identified by the request path
func (co *ClientConn) Delete(path string) (Message, error) {
	return co.DeleteWithContext(context.Background(), path)
}

// Delete delete the resource identified by the request path
func (co *ClientConn) DeleteWithContext(ctx context.Context, path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Delete(path)
	return co.commander.DeleteWithContext(ctx, path)
}

func (co *ClientConn) Observe(path string, observeFunc func(req *Request)) (*Observation, error) {
	return co.ObserveWithContext(context.Background(), path, observeFunc)
}

func (co *ClientConn) ObserveWithContext(ctx context.Context, path string, observeFunc func(req *Request)) (*Observation, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.commander.Observe(path, observeFunc)
	return co.commander.ObserveWithContext(ctx, path, observeFunc)
}

// Close close connection
func (co *ClientConn) Close() error {
	co.srv.Shutdown()
	select {
	case <-co.shutdownSync:
	case <-time.After(co.client.syncTimeout()):
		log.Fatal("Client cannot recv shutdown: Timeout")
	}
	<-co.shutdownSync
	return nil
}

// Dial connects to the address on the named network.
func Dial(network, address string) (*ClientConn, error) {
	client := Client{Net: network}
	return client.Dial(address)
	return client.DialWithContext(context.Background(), address)
}

// DialTimeout acts like Dial but takes a timeout.
func DialTimeout(network, address string, timeout time.Duration) (*ClientConn, error) {
	client := Client{Net: network, DialTimeout: timeout}
	return client.Dial(address)
	return client.DialWithContext(context.Background(), address)
}

func fixNetTLS(network string) string {


@@ 378,11 377,11 @@ func fixNetTLS(network string) string {
// DialWithTLS connects to the address on the named network with TLS.
func DialWithTLS(network, address string, tlsConfig *tls.Config) (conn *ClientConn, err error) {
	client := Client{Net: fixNetTLS(network), TLSConfig: tlsConfig}
	return client.Dial(address)
	return client.DialWithContext(context.Background(), address)
}

// DialTimeoutWithTLS acts like DialWithTLS but takes a timeout.
func DialTimeoutWithTLS(network, address string, tlsConfig *tls.Config, timeout time.Duration) (conn *ClientConn, err error) {
	client := Client{Net: fixNetTLS(network), DialTimeout: timeout, TLSConfig: tlsConfig}
	return client.Dial(address)
	return client.DialWithContext(context.Background(), address)
}

M client_test.go => client_test.go +29 -9
@@ 8,6 8,7 @@ import (
	"testing"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)


@@ 131,7 132,7 @@ func testServingMCastByClient(t *testing.T, lnet, laddr string, BlockWiseTransfe
		t.Fatalf("cannot dial addr: %v", err)
	}

	if err := joinGroup(co.srv.Conn.(*net.UDPConn), nil, a); err != nil {
	if err := kitNet.JoinGroup(co.srv.Conn.(*net.UDPConn), nil, a); err != nil {
		t.Fatalf("cannot join self to multicast group: %v", err)
	}
	if ip4 := co.srv.Conn.(*net.UDPConn).LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {


@@ 187,8 188,8 @@ func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockWiseSzx16, ifis)
}

func setupServer(t *testing.T) (string, error) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockWiseSzx1024, func(w ResponseWriter, r *Request) {
func setupServer(t *testing.T) (*Server, string, chan error, error) {
	return RunLocalServerUDPWithHandler("udp", ":0", true, BlockWiseSzx1024, func(w ResponseWriter, r *Request) {
		msg := r.Client.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,


@@ 206,15 207,18 @@ func setupServer(t *testing.T) (string, error) {
			return
		}
	})
	return addr, err
}

func TestServingUDPGet(t *testing.T) {

	addr, err := setupServer(t)
	s, addr, fin, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	defer func() {
		s.Shutdown()
		<-fin
	}()

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx16


@@ 230,10 234,14 @@ func TestServingUDPGet(t *testing.T) {
}

func TestServingUDPPost(t *testing.T) {
	addr, err := setupServer(t)
	s, addr, fin, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	defer func() {
		s.Shutdown()
		<-fin
	}()

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024


@@ 250,10 258,14 @@ func TestServingUDPPost(t *testing.T) {
}

func TestServingUDPPut(t *testing.T) {
	addr, err := setupServer(t)
	s, addr, fin, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	defer func() {
		s.Shutdown()
		<-fin
	}()

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024


@@ 270,10 282,14 @@ func TestServingUDPPut(t *testing.T) {
}

func TestServingUDPDelete(t *testing.T) {
	addr, err := setupServer(t)
	s, addr, fin, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	defer func() {
		s.Shutdown()
		<-fin
	}()

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024


@@ 289,7 305,7 @@ func TestServingUDPDelete(t *testing.T) {
}

func TestServingUDPObserve(t *testing.T) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockWiseSzx16, func(w ResponseWriter, r *Request) {
	s, addr, fin, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockWiseSzx16, func(w ResponseWriter, r *Request) {
		msg := r.Client.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,


@@ 308,6 324,10 @@ func TestServingUDPObserve(t *testing.T) {
			return
		}
	})
	defer func() {
		s.Shutdown()
		<-fin
	}()
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

M clientcommander.go => clientcommander.go +69 -45
@@ 2,6 2,7 @@ package coap

import (
	"bytes"
	"context"
	"io"
	"io/ioutil"
	"net"


@@ 90,61 91,96 @@ func (cc *ClientCommander) Equal(cc1 *ClientCommander) bool {
	return cc.RemoteAddr().String() == cc1.RemoteAddr().String() && cc.LocalAddr().String() == cc1.LocalAddr().String()
}

// Exchange performs a synchronous query. It sends the message m to the address
// Exchange same as ExchangeContext without context
func (cc *ClientCommander) Exchange(m Message) (Message, error) {
	return cc.ExchangeWithContext(context.Background(), m)
}

// ExchangeContext performs a synchronous query with context. It sends the message m to the address
// contained in a and waits for a reply.
//
// Exchange does not retry a failed query, nor will it fall back to TCP in
// ExchangeContext does not retry a failed query, nor will it fall back to TCP in
// case of truncation.
// To specify a local address or a timeout, the caller has to set the `Client.Dialer`
// attribute appropriately
func (cc *ClientCommander) Exchange(m Message) (Message, error) {
	return cc.networkSession.Exchange(m)
func (cc *ClientCommander) ExchangeWithContext(ctx context.Context, m Message) (Message, error) {
	return cc.networkSession.ExchangeWithContext(ctx, m)
}

// WriteMsg sends direct a message through the connection
// WriteMsg sends  direct a message through the connection
func (cc *ClientCommander) WriteMsg(m Message) error {
	return cc.networkSession.WriteMsg(m)
	return cc.WriteMsgWithContext(context.Background(), m)
}

// WriteContextMsg sends with context direct a message through the connection
func (cc *ClientCommander) WriteMsgWithContext(ctx context.Context, m Message) error {
	return cc.networkSession.WriteMsgWithContext(ctx, m)
}

// Ping send a ping message and wait for a pong response
func (cc *ClientCommander) Ping(timeout time.Duration) error {
	return cc.networkSession.Ping(timeout)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	return cc.networkSession.PingWithContext(ctx)
}

// PingContext send with context a ping message and wait for a pong response
func (cc *ClientCommander) PingWithContext(ctx context.Context) error {
	return cc.networkSession.PingWithContext(ctx)
}

// Get retrieve the resource identified by the request path
// Get retrieves the resource identified by the request path
func (cc *ClientCommander) Get(path string) (Message, error) {
	return cc.GetWithContext(context.Background(), path)
}

// GetContext retrieves with context the resource identified by the request path
func (cc *ClientCommander) GetWithContext(ctx context.Context, path string) (Message, error) {
	req, err := cc.NewGetRequest(path)
	if err != nil {
		return nil, err
	}
	return cc.networkSession.Exchange(req)
	return cc.networkSession.ExchangeWithContext(ctx, req)
}

// Post update the resource identified by the request path
// Post updates the resource identified by the request path
func (cc *ClientCommander) Post(path string, contentFormat MediaType, body io.Reader) (Message, error) {
	return cc.PostWithContext(context.Background(), path, contentFormat, body)
}

// PostContext updates with context the resource identified by the request path
func (cc *ClientCommander) PostWithContext(ctx context.Context, path string, contentFormat MediaType, body io.Reader) (Message, error) {
	req, err := cc.NewPostRequest(path, contentFormat, body)
	if err != nil {
		return nil, err
	}
	return cc.networkSession.Exchange(req)
	return cc.networkSession.ExchangeWithContext(ctx, req)
}

// Put create the resource identified by the request path
// Put creates the resource identified by the request path
func (cc *ClientCommander) Put(path string, contentFormat MediaType, body io.Reader) (Message, error) {
	return cc.PutWithContext(context.Background(), path, contentFormat, body)
}

// PutContext creates with context the resource identified by the request path
func (cc *ClientCommander) PutWithContext(ctx context.Context, path string, contentFormat MediaType, body io.Reader) (Message, error) {
	req, err := cc.NewPutRequest(path, contentFormat, body)
	if err != nil {
		return nil, err
	}
	return cc.networkSession.Exchange(req)
	return cc.networkSession.ExchangeWithContext(ctx, req)
}

// Delete delete the resource identified by the request path
// Delete deletes the resource identified by the request path
func (cc *ClientCommander) Delete(path string) (Message, error) {
	return cc.DeleteWithContext(context.Background(), path)
}

// DeleteContext deletes with context the resource identified by the request path
func (cc *ClientCommander) DeleteWithContext(ctx context.Context, path string) (Message, error) {
	req, err := cc.NewDeleteRequest(path)
	if err != nil {
		return nil, err
	}
	return cc.networkSession.Exchange(req)
	return cc.networkSession.ExchangeWithContext(ctx, req)
}

//Observation represents subscription to resource on the server


@@ 155,8 191,12 @@ type Observation struct {
	client    *ClientCommander
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation) Cancel() error {
	return o.CancelWithContext(context.Background())
}

// CancelContext remove observation from server. For recreate observation use Observe.
func (o *Observation) CancelWithContext(ctx context.Context) error {
	req := o.client.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,


@@ 165,7 205,7 @@ func (o *Observation) Cancel() error {
	})
	req.SetPathString(o.path)
	req.SetOption(Observe, 1)
	err1 := o.client.WriteMsg(req)
	err1 := o.client.WriteMsgWithContext(ctx, req)
	err2 := o.client.networkSession.TokenHandler().Remove(o.token)
	if err1 != nil {
		return err1


@@ 173,9 213,13 @@ func (o *Observation) Cancel() error {
	return err2
}

// Observe subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) (*Observation, error) {
	return cc.ObserveWithContext(context.Background(), path, observeFunc)
}

// ObserveContext subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (cc *ClientCommander) ObserveWithContext(ctx context.Context, path string, observeFunc func(req *Request)) (*Observation, error) {
	req, err := cc.NewGetRequest(path)
	if err != nil {
		return nil, err


@@ 216,7 260,7 @@ func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) 
		}

		if needGet {
			resp, err = r.Client.Get(path)
			resp, err = r.Client.GetWithContext(ctx, path)
			if err != nil {
				return
			}


@@ 238,12 282,12 @@ func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) 
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				if setObsSeqNum() {
					observeFunc(&Request{Msg: resp, Client: r.Client})
					observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx})
				}
			}
		default:
			if setObsSeqNum() {
				observeFunc(&Request{Msg: resp, Client: r.Client})
				observeFunc(&Request{Msg: resp, Client: r.Client, Ctx: r.Ctx})
			}
		}
		return


@@ 251,7 295,7 @@ func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) 
	if err != nil {
		return nil, err
	}
	err = cc.WriteMsg(req)
	err = cc.WriteMsgWithContext(ctx, req)
	if err != nil {
		cc.networkSession.TokenHandler().Remove(o.token)
		return nil, err


@@ 264,23 308,3 @@ func (cc *ClientCommander) Observe(path string, observeFunc func(req *Request)) 
func (cc *ClientCommander) Close() error {
	return cc.networkSession.Close()
}

// SetReadDeadline set read deadline for timeout for Exchange
func (cc *ClientCommander) SetReadDeadline(timeout time.Duration) {
	cc.networkSession.SetReadDeadline(timeout)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (cc *ClientCommander) SetWriteDeadline(timeout time.Duration) {
	cc.networkSession.SetWriteDeadline(timeout)
}

// ReadDeadline get read deadline
func (cc *ClientCommander) ReadDeadline() time.Duration {
	return cc.networkSession.ReadDeadline()
}

// WriteDeadline get read writeline
func (cc *ClientCommander) WriteDeadline() time.Duration {
	return cc.networkSession.WriteDeadline()
}

D conn.go => conn.go +0 -223
@@ 1,223 0,0 @@
package coap

import (
	"bytes"
	"log"
	"net"
	"sync/atomic"
	"time"
)

type writeReq interface {
	sendResp(err error, timeout time.Duration)
	waitResp(timeout time.Duration) error
	data() Message
}

type writeReqBase struct {
	req      Message
	respChan chan error // channel must have size 1 for non-blocking write to channel
}

func (wreq *writeReqBase) sendResp(err error, timeout time.Duration) {
	select {
	case wreq.respChan <- err:
		return
	default:
		log.Fatal("Exactly one error can be send as resp. This is err.")
	}
}

func (wreq *writeReqBase) waitResp(timeout time.Duration) error {
	select {
	case err := <-wreq.respChan:
		return err
	case <-time.After(timeout):
		return ErrTimeout
	}
}

func (wreq *writeReqBase) data() Message {
	return wreq.req
}

type writeReqTCP struct {
	writeReqBase
}

type writeReqUDP struct {
	writeReqBase
	sessionData *SessionUDPData
}

// Conn represents the connection
type Conn interface {
	// LocalAddr get local address of the connection
	LocalAddr() net.Addr
	// RemoteAddr get peer address of the connection
	RemoteAddr() net.Addr
	// Close close the connection
	Close() error

	write(w writeReq, timeout time.Duration) error
}

type connWriter interface {
	writeHandler(srv *Server) bool
	writeEndHandler(timeout time.Duration) bool
	sendFinish(timeout time.Duration)

	writeHandlerWithFunc(srv *Server, writeFunc func(srv *Server, wreq writeReq) error) bool
}

type connBase struct {
	writeChan chan writeReq
	closeChan chan bool
	finChan   chan bool
	closed    int32
}

func (conn *connBase) finishWrite() {
	if !atomic.CompareAndSwapInt32(&conn.closed, conn.closed, 1) {
		return
	}
	conn.closeChan <- true
	<-conn.finChan
}

func (conn *connBase) writeHandlerWithFunc(srv *Server, writeFunc func(srv *Server, wreq writeReq) error) bool {
	select {
	case wreq := <-conn.writeChan:
		wreq.sendResp(writeFunc(srv, wreq), srv.syncTimeout())
		return true
	case <-conn.closeChan:
		return false
	}
}

func (conn *connBase) sendFinish(timeout time.Duration) {
	select {
	case conn.finChan <- true:
	case <-time.After(timeout):
		log.Fatal("Client cannot recv start: Timeout")
	}
}

func (conn *connBase) writeEndHandler(timeout time.Duration) bool {
	select {
	case wreq := <-conn.writeChan:
		wreq.sendResp(ErrConnectionClosed, timeout)
		return true
	default:
		return false
	}
}

func (conn *connBase) write(w writeReq, timeout time.Duration) error {
	if atomic.LoadInt32(&conn.closed) > 0 {
		return ErrConnectionClosed
	}
	select {
	case conn.writeChan <- w:
		return w.waitResp(timeout)
	case <-time.After(timeout):
		return ErrTimeout
	}
}

type connTCP struct {
	connBase
	connection net.Conn // i/o connection if TCP was used
	num        int32
}

func (conn *connTCP) LocalAddr() net.Addr {
	return conn.connection.LocalAddr()
}

func (conn *connTCP) RemoteAddr() net.Addr {
	return conn.connection.RemoteAddr()
}

func (conn *connTCP) Close() error {
	conn.finishWrite()
	return conn.connection.Close()
}

func (conn *connTCP) writeHandler(srv *Server) bool {
	return conn.writeHandlerWithFunc(srv, func(srv *Server, wreq writeReq) error {
		data := wreq.data()
		wr := srv.acquireWriter(conn.connection)
		defer srv.releaseWriter(wr)
		writeTimeout := srv.writeTimeout()
		conn.connection.SetWriteDeadline(time.Now().Add(writeTimeout))
		err := data.MarshalBinary(wr)
		if err != nil {
			return err
		}
		wr.Flush()
		return nil
	})
}

type connUDP struct {
	connBase
	connection *net.UDPConn // i/o connection if UDP was used
}

func (conn *connUDP) LocalAddr() net.Addr {
	return conn.connection.LocalAddr()
}

func (conn *connUDP) RemoteAddr() net.Addr {
	return conn.connection.RemoteAddr()
}

func (conn *connUDP) SetReadDeadline(timeout time.Time) error {
	return conn.connection.SetReadDeadline(timeout)
}

func (conn *connUDP) ReadFromSessionUDP(m []byte) (int, *SessionUDPData, error) {
	return ReadFromSessionUDP(conn.connection, m)
}

func (conn *connUDP) Close() error {
	conn.finishWrite()
	return conn.connection.Close()
}

func (conn *connUDP) writeHandler(srv *Server) bool {
	return conn.writeHandlerWithFunc(srv, func(srv *Server, wreq writeReq) error {
		data := wreq.data()
		wreqUDP := wreq.(*writeReqUDP)
		writeTimeout := srv.writeTimeout()
		buf := &bytes.Buffer{}
		err := data.MarshalBinary(buf)
		if err != nil {
			return err
		}
		conn.connection.SetWriteDeadline(time.Now().Add(writeTimeout))
		_, err = WriteToSessionUDP(conn.connection, buf.Bytes(), wreqUDP.sessionData)
		return err
	})
}

func newConnectionTCP(c net.Conn, srv *Server) Conn {
	connection := &connTCP{connBase: connBase{writeChan: make(chan writeReq, 10000), closeChan: make(chan bool), finChan: make(chan bool), closed: 0}, connection: c}
	go writeToConnection(connection, srv)
	return connection
}

func newConnectionUDP(c *net.UDPConn, srv *Server) Conn {
	connection := &connUDP{connBase: connBase{writeChan: make(chan writeReq, 10000), closeChan: make(chan bool), finChan: make(chan bool), closed: 0}, connection: c}
	go writeToConnection(connection, srv)
	return connection
}

func writeToConnection(conn connWriter, srv *Server) {
	for conn.writeHandler(srv) {
	}
	for conn.writeEndHandler(srv.syncTimeout()) {
	}
	conn.sendFinish(srv.syncTimeout())
}

M getresponsewriter.go => getresponsewriter.go +6 -4
@@ 1,21 1,23 @@
package coap

import "context"

type getResponseWriter struct {
	ResponseWriter
}

// Write send response to peer
func (w *getResponseWriter) WriteMsg(msg Message) error {
func (w *getResponseWriter) WriteMsgWithContext(ctx context.Context, msg Message) error {
	if msg.Payload() != nil && msg.Option(ETag) == nil {
		msg.SetOption(ETag, CalcETag(msg.Payload()))
	}

	return w.ResponseWriter.WriteMsg(msg)
	return w.ResponseWriter.WriteMsgWithContext(ctx, msg)
}

// Write send response to peer
func (w *getResponseWriter) Write(p []byte) (n int, err error) {
func (w *getResponseWriter) WriteWithContext(ctx context.Context, p []byte) (n int, err error) {
	l, resp := prepareReponse(w, w.ResponseWriter.getReq().Msg.Code(), w.ResponseWriter.getCode(), w.ResponseWriter.getContentFormat(), p)
	err = w.WriteMsg(resp)
	err = w.WriteMsgWithContext(ctx, resp)
	return l, err
}

M iotivity_test.go => iotivity_test.go +1 -1
@@ 90,7 90,7 @@ func TestBlockWiseGetBlock16(t *testing.T) {
	}
	req.SetOption(Block2, block2)
	decodeMsg(req)
	resp, err := co.Exchange(req)
	resp, err := co.ExchangeWithContext(req)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}

M messagetcp.go => messagetcp.go +68 -34
@@ 2,6 2,7 @@ package coap

import (
	"bytes"
	"context"
	"encoding/binary"
	"fmt"
	"io"


@@ 219,6 220,10 @@ type msgTcpInfo struct {
	totLen int
}

func (m msgTcpInfo) BodyLen() int {
	return m.totLen - m.hdrLen
}

func normalizeErrors(e error) error {
	if e == io.EOF || e == io.ErrUnexpectedEOF {
		return io.ErrUnexpectedEOF


@@ 228,56 233,67 @@ func normalizeErrors(e error) error {

// readTcpMsgInfo infers information about a TCP CoAP message from the first
// fragment.
func readTcpMsgInfo(r io.Reader) (msgTcpInfo, error) {

type contextReader interface {
	ReadFullWithContext(context.Context, []byte) error
}

func readTcpMsgInfo(ctx context.Context, conn contextReader) (msgTcpInfo, error) {
	mti := msgTcpInfo{}

	hdrOff := 0

	var firstByte byte
	if err := binary.Read(r, binary.BigEndian, &firstByte); err != nil {
		return mti, normalizeErrors(err)
	firstByte := make([]byte, 1)
	err := conn.ReadFullWithContext(ctx, firstByte)
	if err != nil {
		return mti, fmt.Errorf("cannot read coap header: %v", err)
	}
	hdrOff++

	lenNib := (firstByte & 0xf0) >> 4
	tkl := firstByte & 0x0f
	lenNib := (firstByte[0] & 0xf0) >> 4
	tkl := firstByte[0] & 0x0f

	var opLen int
	switch {
	case lenNib < TCP_MESSAGE_LEN13_BASE:
		opLen = int(lenNib)
	case lenNib == 13:
		var extLen byte
		if err := binary.Read(r, binary.BigEndian, &extLen); err != nil {
			return mti, err
		extLen := make([]byte, 1)
		err := conn.ReadFullWithContext(ctx, extLen)
		if err != nil {
			return mti, fmt.Errorf("cannot read coap header: %v", err)
		}
		hdrOff++
		opLen = TCP_MESSAGE_LEN13_BASE + int(extLen)
		opLen = TCP_MESSAGE_LEN13_BASE + int(extLen[0])
	case lenNib == 14:
		var extLen uint16
		if err := binary.Read(r, binary.BigEndian, &extLen); err != nil {
			return mti, err
		extLen := make([]byte, 2)
		err := conn.ReadFullWithContext(ctx, extLen)
		if err != nil {
			return mti, fmt.Errorf("cannot read coap header: %v", err)
		}
		hdrOff += 2
		opLen = TCP_MESSAGE_LEN14_BASE + int(extLen)
		opLen = TCP_MESSAGE_LEN14_BASE + int(binary.BigEndian.Uint16(extLen))
	case lenNib == 15:
		var extLen uint32
		if err := binary.Read(r, binary.BigEndian, &extLen); err != nil {
			return mti, err
		extLen := make([]byte, 4)
		err := conn.ReadFullWithContext(ctx, extLen)
		if err != nil {
			return mti, fmt.Errorf("cannot read coap header: %v", err)
		}
		hdrOff += 4
		opLen = TCP_MESSAGE_LEN15_BASE + int(extLen)
		opLen = TCP_MESSAGE_LEN15_BASE + int(binary.BigEndian.Uint32(extLen))
	}

	mti.totLen = hdrOff + 1 + int(tkl) + opLen

	if err := binary.Read(r, binary.BigEndian, &mti.code); err != nil {
		return mti, err
	code := make([]byte, 1)
	err = conn.ReadFullWithContext(ctx, code)
	if err != nil {
		return mti, fmt.Errorf("cannot read coap header: %v", err)
	}
	mti.code = uint8(code[0])
	hdrOff++

	mti.token = make([]byte, tkl)
	if _, err := io.ReadFull(r, mti.token); err != nil {
	if err := conn.ReadFullWithContext(ctx, mti.token); err != nil {
		return mti, err
	}
	hdrOff += int(tkl)


@@ 287,12 303,7 @@ func readTcpMsgInfo(r io.Reader) (msgTcpInfo, error) {
	return mti, nil
}

func readTcpMsgBody(mti msgTcpInfo, r io.Reader) (options, []byte, error) {
	bodyLen := mti.totLen - mti.hdrLen
	b := make([]byte, bodyLen)
	if _, err := io.ReadFull(r, b); err != nil {
		return nil, nil, err
	}
func parseTcpOptionsPayload(mti msgTcpInfo, b []byte) (options, []byte, error) {
	optionDefs := coapOptionDefs
	switch COAPCode(mti.code) {
	case CSM:


@@ 321,10 332,20 @@ func (m *TcpMessage) fill(mti msgTcpInfo, o options, p []byte) {
	m.MessageBase.payload = p
}

type contextBytesReader struct {
	reader io.Reader
}

func (r *contextBytesReader) ReadFullWithContext(ctx context.Context, b []byte) error {
	_, err := io.ReadFull(r.reader, b)
	return err
}

func (m *TcpMessage) UnmarshalBinary(data []byte) error {
	r := bytes.NewReader(data)

	mti, err := readTcpMsgInfo(r)
	r := &contextBytesReader{reader: bytes.NewReader(data)}

	mti, err := readTcpMsgInfo(context.Background(), r)
	if err != nil {
		return fmt.Errorf("Error reading TCP CoAP header; %s", err.Error())
	}


@@ 334,7 355,13 @@ func (m *TcpMessage) UnmarshalBinary(data []byte) error {
			mti.totLen, len(data))
	}

	o, p, err := readTcpMsgBody(mti, r)
	b := make([]byte, mti.BodyLen())
	err = r.ReadFullWithContext(context.Background(), b)
	if err != nil {
		return fmt.Errorf("cannot read TCP CoAP body: %v", err)
	}

	o, p, err := parseTcpOptionsPayload(mti, b)
	if err != nil {
		return err
	}


@@ 380,13 407,20 @@ func PullTcp(data []byte) (*TcpMessage, []byte, error) {
}

// Decode reads a single message from its input.
func Decode(r io.Reader) (*TcpMessage, error) {
	mti, err := readTcpMsgInfo(r)
func Decode(reader io.Reader) (*TcpMessage, error) {
	r := &contextBytesReader{reader: reader}
	mti, err := readTcpMsgInfo(context.Background(), r)
	if err != nil {
		return nil, err
	}

	o, p, err := readTcpMsgBody(mti, r)
	b := make([]byte, mti.BodyLen())
	err = r.ReadFullWithContext(context.Background(), b)
	if err != nil {
		return nil, fmt.Errorf("cannot read TCP CoAP body: %v", err)
	}

	o, p, err := parseTcpOptionsPayload(mti, b)
	if err != nil {
		return nil, err
	}

M multicastClient.go => multicastClient.go +33 -21
@@ 3,6 3,7 @@ package coap
// A client implementation.

import (
	"context"
	"net"
	"time"
)


@@ 20,7 21,7 @@ type MulticastClient struct {
	DialTimeout    time.Duration // set Timeout for dialer
	ReadTimeout    time.Duration // net.ClientConn.SetReadTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	WriteTimeout   time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	SyncTimeout    time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal
	HeartBeat      time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal

	Handler              HandlerFunc     // default handler for handling messages from server
	NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.


@@ 32,7 33,7 @@ type MulticastClient struct {
}

// Dial connects to the address on the named network.
func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
func (c *MulticastClient) dialNet(ctx context.Context, net, address string) (*ClientConn, error) {
	if c.multicastHandler == nil {
		c.multicastHandler = &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)}
	}


@@ 42,7 43,7 @@ func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
		DialTimeout:    c.DialTimeout,
		ReadTimeout:    c.ReadTimeout,
		WriteTimeout:   c.WriteTimeout,
		SyncTimeout:    c.SyncTimeout,
		HeartBeat:      c.HeartBeat,
		Handler: func(w ResponseWriter, r *Request) {
			handler := c.Handler
			if handler == nil {


@@ 55,11 56,15 @@ func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
		BlockWiseTransferSzx: c.BlockWiseTransferSzx,
	}

	return client.Dial(address)
	return client.DialWithContext(ctx, address)
}

// Dial connects to the address on the named network.
func (c *MulticastClient) Dial(address string) (*MulticastClientConn, error) {
	return c.DialWithContext(context.Background(), address)
}

// DialContext connects with context to the address on the named network.
func (c *MulticastClient) DialWithContext(ctx context.Context, address string) (*MulticastClientConn, error) {
	var net string
	switch c.Net {
	case "udp", "udp4", "udp6":


@@ 69,7 74,7 @@ func (c *MulticastClient) Dial(address string) (*MulticastClientConn, error) {
	default:
		return nil, ErrInvalidNetParameter
	}
	conn, err := c.dialNet(net, address)
	conn, err := c.dialNet(ctx, net, address)
	if err != nil {
		return nil, err
	}


@@ 101,17 106,12 @@ func (mconn *MulticastClientConn) NewGetRequest(path string) (Message, error) {

// WriteMsg sends a message through the connection co.
func (mconn *MulticastClientConn) WriteMsg(m Message) error {
	return mconn.conn.WriteMsg(m)
	return mconn.WriteMsgWithContext(context.Background(), m)
}

// SetReadDeadline set read deadline for timeout for Exchange
func (mconn *MulticastClientConn) SetReadDeadline(timeout time.Duration) {
	mconn.conn.SetReadDeadline(timeout)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (mconn *MulticastClientConn) SetWriteDeadline(timeout time.Duration) {
	mconn.conn.SetWriteDeadline(timeout)
// WriteContextMsg sends a message with context through the connection co.
func (mconn *MulticastClientConn) WriteMsgWithContext(ctx context.Context, m Message) error {
	return mconn.conn.WriteMsgWithContext(ctx, m)
}

// Close close connection


@@ 131,19 131,31 @@ func (r *ResponseWaiter) Cancel() error {
	return r.conn.client.multicastHandler.Remove(r.token)
}

// Publish subscribe to sever on path. After subscription and every change on path,
// Publish subscribes to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) Publish(path string, responseHandler func(req *Request)) (*ResponseWaiter, error) {
	return mconn.PublishWithContext(context.Background(), path, responseHandler)
}

// PublishContext subscribes with context to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishWithContext(ctx context.Context, path string, responseHandler func(req *Request)) (*ResponseWaiter, error) {
	req, err := mconn.NewGetRequest(path)
	if err != nil {
		return nil, err
	}
	return mconn.PublishMsg(req, responseHandler)
	return mconn.PublishContextMsg(ctx, req, responseHandler)
}

// PublishMsg subscribe to sever with GET message. After subscription and every change on path,
// PublishMsg subscribes to sever with GET message. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishMsg(req Message, responseHandler func(req *Request)) (*ResponseWaiter, error) {
	return mconn.PublishContextMsg(context.Background(), req, responseHandler)
}

// PublishContextMsg subscribes with context to sever with GET message. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishContextMsg(ctx context.Context, req Message, responseHandler func(req *Request)) (*ResponseWaiter, error) {
	if req.Code() != GET || req.PathString() == "" {
		return nil, ErrInvalidRequest
	}


@@ 180,18 192,18 @@ func (mconn *MulticastClientConn) PublishMsg(req Message, responseHandler func(r
		}

		if needGet {
			resp, err = r.Client.Get(path)
			resp, err = r.Client.GetWithContext(ctx, path)
			if err != nil {
				return
			}
		}
		responseHandler(&Request{Msg: resp, Client: r.Client})
		responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx})
	})
	if err != nil {
		return nil, err
	}

	err = mconn.WriteMsg(req)
	err = mconn.WriteMsgWithContext(ctx, req)
	if err != nil {
		mconn.client.multicastHandler.Remove(r.token)
		return nil, err

M networksession.go => networksession.go +73 -90
@@ 1,12 1,16 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"net"
	"sync"
	"sync/atomic"
	"time"

	kitNet "github.com/go-ocf/kit/net"
)

// A networkSession interface is used by an COAP handler to


@@ 16,27 20,19 @@ type networkSession interface {
	LocalAddr() net.Addr
	// RemoteAddr returns the net.Addr of the client that sent the current request.
	RemoteAddr() net.Addr
	// WriteMsg writes a reply back to the client.
	WriteMsg(resp Message) error
	// WriteContextMsg writes a reply back to the client.
	WriteMsgWithContext(ctx context.Context, resp Message) error
	// Close closes the connection.
	Close() error
	// Return type of network
	IsTCP() bool
	// Create message for response via writter
	NewMessage(params MessageParams) Message
	// Exchange writes message and wait for response - paired by token and msgid
	// ExchangeContext writes message and wait for response - paired by token and msgid
	// it is safe to use in goroutines
	Exchange(req Message) (Message, error)
	ExchangeWithContext(ctx context.Context, req Message) (Message, error)
	// Send ping to peer and wait for pong
	Ping(timeout time.Duration) error
	// SetReadDeadline set read deadline for timeout for Exchange
	SetReadDeadline(timeout time.Duration)
	// SetWriteDeadline set write deadline for timeout for Exchange and Write
	SetWriteDeadline(timeout time.Duration)
	// ReadDeadline get read deadline
	ReadDeadline() time.Duration
	// WriteDeadline get read writeline
	WriteDeadline() time.Duration
	PingWithContext(ctx context.Context) error

	// handlePairMsg Message was handled by pair
	handlePairMsg(w ResponseWriter, r *Request) bool


@@ 50,8 46,6 @@ type networkSession interface {
	// close session with error
	closeWithError(err error) error

	exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error)

	TokenHandler() *TokenHandler

	// BlockWiseTransferEnabled


@@ 65,8 59,7 @@ type networkSession interface {
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {

func newSessionUDP(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {


@@ 83,13 76,11 @@ func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData)
	s := &sessionUDP{
		sessionBase: sessionBase{
			srv:                  srv,
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
		},
		connection:     connection,
		sessionUDPData: sessionUDPData,
		mapPairs:       make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
	}


@@ 97,7 88,7 @@ func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData)
}

// newSessionTCP create new session for TCP connection
func newSessionTCP(connection Conn, srv *Server) (networkSession, error) {
func newSessionTCP(connection *kitNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockWiseSzxBERT
	if srv.BlockWiseTransfer != nil {


@@ 109,11 100,9 @@ func newSessionTCP(connection Conn, srv *Server) (networkSession, error) {
	s := &sessionTCP{
		mapPairs:           make(map[[MaxTokenSize]byte](*sessionResp)),
		peerMaxMessageSize: uint32(srv.MaxMessageSize),
		connection:         connection,
		sessionBase: sessionBase{
			srv:                  srv,
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),


@@ 134,11 123,8 @@ type sessionResp struct {
}

type sessionBase struct {
	srv           *Server
	connection    Conn
	readDeadline  time.Duration
	writeDeadline time.Duration
	handler       *TokenHandler
	srv     *Server
	handler *TokenHandler

	blockWiseTransfer    bool
	blockWiseTransferSzx uint32 //BlockWiseSzx


@@ 146,14 132,15 @@ type sessionBase struct {

type sessionUDP struct {
	sessionBase
	sessionUDPData *SessionUDPData                                // oob data to get egress interface right
	connection     *kitNet.ConnUDP
	sessionUDPData *kitNet.ConnUDPContext                         // oob data to get egress interface right
	mapPairs       map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock   sync.Mutex                                     //to sync add remove token
}

type sessionTCP struct {
	sessionBase

	connection   *kitNet.Conn
	mapPairs     map[[MaxTokenSize]byte]*sessionResp //storage of channel Message
	mapPairsLock sync.Mutex                          //to sync add remove token



@@ 181,23 168,6 @@ func (s *sessionTCP) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

func (s *sessionBase) SetReadDeadline(timeout time.Duration) {
	s.readDeadline = timeout
}

func (s *sessionBase) SetWriteDeadline(timeout time.Duration) {
	s.writeDeadline = timeout
}

func (s *sessionBase) ReadDeadline() time.Duration {
	return s.readDeadline
}

// WriteDeadline get read writeline
func (s *sessionBase) WriteDeadline() time.Duration {
	return s.writeDeadline
}

// BlockWiseTransferEnabled
func (s *sessionUDP) blockWiseEnabled() bool {
	return s.blockWiseTransfer


@@ 251,14 221,13 @@ func (s *sessionUDP) closeWithError(err error) error {
	s.srv.sessionUDPMapLock.Lock()
	delete(s.srv.sessionUDPMap, s.sessionUDPData.Key())
	s.srv.sessionUDPMapLock.Unlock()

	s.srv.NotifySessionEndFunc(&ClientCommander{s}, err)

	return err
}

// Ping send ping over udp(unicast) and wait for response.
func (s *sessionUDP) Ping(timeout time.Duration) error {
func (s *sessionUDP) PingWithContext(ctx context.Context) error {
	//provoking to get a reset message - "CoAP ping" in RFC-7252
	//https://tools.ietf.org/html/rfc7252#section-4.2
	//https://tools.ietf.org/html/rfc7252#section-4.3


@@ 269,7 238,7 @@ func (s *sessionUDP) Ping(timeout time.Duration) error {
		Code:      Empty,
		MessageID: GenerateMessageID(),
	})
	resp, err := s.exchangeTimeout(req, timeout, timeout)
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}


@@ 279,7 248,7 @@ func (s *sessionUDP) Ping(timeout time.Duration) error {
	return ErrInvalidResponse
}

func (s *sessionTCP) Ping(timeout time.Duration) error {
func (s *sessionTCP) PingWithContext(ctx context.Context) error {
	if s.srv.DisableTCPSignalMessages {
		return fmt.Errorf("cannot send ping: TCP Signal messages are disabled")
	}


@@ 292,7 261,7 @@ func (s *sessionTCP) Ping(timeout time.Duration) error {
		Code:  Ping,
		Token: []byte(token),
	})
	resp, err := s.exchangeTimeout(req, timeout, timeout)
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}


@@ 360,21 329,13 @@ func (s *sessionBase) exchangeFunc(req Message, writeTimeout, readTimeout time.D
	}
}

// Write implements the networkSession.Write method.
func (s *sessionTCP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}

// Write implements the networkSession.Write method.
func (s *sessionUDP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}

func (s *sessionTCP) exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error) {
func (s *sessionTCP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	if err := validateMsg(req); err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	if req.Token() == nil {
		return nil, ErrTokenNotExist
	}

	pairChan := &sessionResp{make(chan *Request, 1)}

	var pairToken [MaxTokenSize]byte


@@ 395,10 356,25 @@ func (s *sessionTCP) exchangeTimeout(req Message, writeDeadline, readDeadline ti
		}
	}()

	return s.exchangeFunc(req, writeDeadline, readDeadline, pairChan, s.writeTimeout)
	err := s.WriteMsgWithContext(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	select {
	case request := <-pairChan.ch:
		return request.Msg, nil
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange: %v", err)
		}
		return nil, fmt.Errorf("cannot exchange: cancelled")
	}
}

func (s *sessionUDP) exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error) {
func (s *sessionUDP) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	if err := validateMsg(req); err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	//register msgid to token
	pairChan := &sessionResp{make(chan *Request, 1)}
	var pairToken [MaxTokenSize]byte


@@ 423,16 399,38 @@ func (s *sessionUDP) exchangeTimeout(req Message, writeDeadline, readDeadline ti
		s.mapPairsLock.Unlock()
	}()

	return s.exchangeFunc(req, writeDeadline, readDeadline, pairChan, s.writeTimeout)
	err := s.WriteMsgWithContext(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cannot exchange: %v", err)
	}
	select {
	case request := <-pairChan.ch:
		return request.Msg, nil
	case <-ctx.Done():
		if ctx.Err() != nil {
			return nil, fmt.Errorf("cannot exchange: %v", err)
		}
		return nil, fmt.Errorf("cannot exchange: cancelled")
	}
}

// Write implements the networkSession.Write method.
func (s *sessionTCP) WriteMsg(m Message) error {
	return s.writeTimeout(m, s.writeDeadline)
func (s *sessionTCP) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to tcp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, buffer.Bytes())
}

func (s *sessionUDP) WriteMsg(m Message) error {
	return s.writeTimeout(m, s.writeDeadline)
func (s *sessionUDP) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to udp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, s.sessionUDPData, buffer.Bytes())
}

func validateMsg(msg Message) error {


@@ 446,21 444,6 @@ func validateMsg(msg Message) error {
	return nil
}

func (s *sessionTCP) writeTimeout(m Message, timeout time.Duration) error {
	if err := validateMsg(m); err != nil {
		return err
	}
	return s.connection.write(&writeReqTCP{writeReqBase{req: m, respChan: make(chan error, 1)}}, timeout)
}

// WriteMsg implements the networkSession.WriteMsg method.
func (s *sessionUDP) writeTimeout(m Message, timeout time.Duration) error {
	if err := validateMsg(m); err != nil {
		return err
	}
	return s.connection.write(&writeReqUDP{writeReqBase{req: m, respChan: make(chan error, 1)}, s.sessionUDPData}, timeout)
}

func (s *sessionTCP) handlePairMsg(w ResponseWriter, r *Request) bool {
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())


@@ 512,7 495,7 @@ func (s *sessionTCP) sendCSM() error {
	if s.blockWiseEnabled() {
		req.AddOption(BlockWiseTransfer, []byte{})
	}
	return s.WriteMsg(req)
	return s.WriteMsgWithContext(context.Background(), req)
}

func (s *sessionTCP) setPeerMaxMessageSize(val uint32) {


@@ 533,7 516,7 @@ func (s *sessionUDP) sendPong(w ResponseWriter, r *Request) error {
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return w.WriteMsg(resp)
	return w.WriteMsgWithContext(r.Ctx, resp)
}

func (s *sessionTCP) sendPong(w ResponseWriter, r *Request) error {


@@ 542,7 525,7 @@ func (s *sessionTCP) sendPong(w ResponseWriter, r *Request) error {
		Code:  Pong,
		Token: r.Msg.Token(),
	})
	return w.WriteMsg(req)
	return w.WriteMsgWithContext(r.Ctx, req)
}

func (s *sessionTCP) handleSignals(w ResponseWriter, r *Request) bool {

M request.go => request.go +3 -0
@@ 1,6 1,9 @@
package coap

import "context"

type Request struct {
	Msg    Message
	Client *ClientCommander
	Ctx    context.Context
}

M responsewriter.go => responsewriter.go +25 -9
@@ 1,13 1,18 @@
package coap

import (
	"context"
)

// A ResponseWriter interface is used by an CAOP handler to construct an COAP response.
// For Obsevation (GET+option observe) it can be stored and used in another go-routine
// with using calls NewResponse, WriteMsg
// with using calls NewResponse, WriteContextMsg
type ResponseWriter interface {
	// Write response with payload.
	Write(p []byte) (n int, err error)
	// WriteContext response with payload.
	// If p is nil it writes response without payload.
	// If p is non-nil then SetContentFormat must be called before Write otherwise Write fails.
	Write(p []byte) (n int, err error)
	WriteWithContext(ctx context.Context, p []byte) (n int, err error)
	// SetCode for response that is send via Write call.
	//
	// If SetCode is not called explicitly, the first call to Write


@@ 22,10 27,12 @@ type ResponseWriter interface {

	//NewResponse create response with code and token, messageid against request
	NewResponse(code COAPCode) Message
	//WriteMsg to client.

	WriteMsg(msg Message) error
	//WriteContextMsg to client.
	//If Option ContentFormat is set and Payload is not set then call will failed.
	//If Option ContentFormat is not set and Payload is set then call will failed.
	WriteMsg(Message) error
	WriteMsgWithContext(ctx context.Context, msg Message) error

	getCode() *COAPCode
	getReq() *Request


@@ 73,13 80,18 @@ func (r *responseWriter) NewResponse(code COAPCode) Message {
	return resp
}

// Write send response to peer
// Write send response without to peer
func (r *responseWriter) WriteMsg(msg Message) error {
	return r.WriteMsgWithContext(context.Background(), msg)
}

// Write send response with context to peer
func (r *responseWriter) WriteMsgWithContext(ctx context.Context, msg Message) error {
	switch msg.Code() {
	case GET, POST, PUT, DELETE:
		return ErrInvalidReponseCode
	}
	return r.req.Client.WriteMsg(msg)
	return r.req.Client.WriteMsgWithContext(ctx, msg)
}

func prepareReponse(w ResponseWriter, reqCode COAPCode, code *COAPCode, contentFormat *MediaType, payload []byte) (int, Message) {


@@ 108,10 120,14 @@ func prepareReponse(w ResponseWriter, reqCode COAPCode, code *COAPCode, contentF
	return l, resp
}

// Write send response to peer
func (r *responseWriter) Write(p []byte) (n int, err error) {
	return r.WriteWithContext(context.Background(), p)
}

// Write send response to peer
func (r *responseWriter) WriteWithContext(ctx context.Context, p []byte) (n int, err error) {
	l, resp := prepareReponse(r, r.req.Msg.Code(), r.code, r.contentFormat, p)
	err = r.WriteMsg(resp)
	err = r.WriteMsgWithContext(ctx, resp)
	return l, err
}


M server.go => server.go +98 -183
@@ 2,7 2,7 @@
package coap

import (
	"bufio"
	"context"
	"crypto/tls"
	"fmt"
	"net"


@@ 12,6 12,8 @@ import (
	"sync"
	"sync/atomic"
	"time"

	kitNet "github.com/go-ocf/kit/net"
)

// Interval for stop worker if no load


@@ 22,6 24,8 @@ const maxWorkersCount = 10000

const coapTimeout time.Duration = 3600 * time.Second

const waitTimer time.Duration = 100 * time.Millisecond

const syncTimeout time.Duration = 30 * time.Second

const maxMessageSize = 1152


@@ 31,6 35,12 @@ const (
	defaultWriteBufferSize = 4096
)

// Listener defined used by coap
type Listener interface {
	Close() error
	AcceptWithContext(ctx context.Context) (net.Conn, error)
}

//DefaultPort default unsecure port for COAP server
const DefaultPort = 5683



@@ 63,7 73,7 @@ func HandleFailed(w ResponseWriter, req *Request) {
		MessageID: req.Msg.MessageID(),
		Token:     req.Msg.Token(),
	})
	w.WriteMsg(msg)
	w.WriteMsgWithContext(req.Ctx, msg)
}

func failedHandler() Handler { return HandlerFunc(HandleFailed) }


@@ 101,7 111,7 @@ func ListenAndServeTLS(addr, certFile, keyFile string, handler Handler) error {
// l and p should not both be non-nil.
// If both l and p are not nil only p will be used.
// Invoke handler for incoming queries.
func ActivateAndServe(l net.Listener, p net.Conn, handler Handler) error {
func ActivateAndServe(l Listener, p net.Conn, handler Handler) error {
	server := &Server{Listener: l, Conn: p, Handler: handler}
	return server.ActivateAndServe()
}


@@ 113,7 123,7 @@ type Server struct {
	// if "tcp" or "tcp-tls" (COAP over TLS) it will invoke a TCP listener, otherwise an UDP one
	Net string
	// TCP Listener to use, this is to aid in systemd's socket activation.
	Listener net.Listener
	Listener Listener
	// TLS connection configuration
	TLSConfig *tls.Config
	// UDP/TCP "Listener/Connection" to use, this is to aid in systemd's socket activation.


@@ 129,12 139,12 @@ type Server struct {
	WriteTimeout time.Duration
	// If NotifyStartedFunc is set it is called once the server has started listening.
	NotifyStartedFunc func()
	// The maximum of time for synchronization go-routines, defaults to 30 seconds, if it occurs, then it call log.Fatal
	SyncTimeout time.Duration
	// Defines wake up interval from operations Read, Write over connection. defaults is 100ms.
	HeartBeat time.Duration
	// If newSessionUDPFunc is set it is called when session UDP want to be created
	newSessionUDPFunc func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error)
	newSessionUDPFunc func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error)
	// If newSessionUDPFunc is set it is called when session TCP want to be created
	newSessionTCPFunc func(connection Conn, srv *Server) (networkSession, error)
	newSessionTCPFunc func(connection *kitNet.Conn, srv *Server) (networkSession, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w *ClientCommander)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.


@@ 148,21 158,20 @@ type Server struct {
	// Disable tcp signal messages
	DisableTCPSignalMessages bool

	TCPReadBufferSize  int
	TCPWriteBufferSize int

	readerPool sync.Pool
	writerPool sync.Pool
	// UDP packet or TCP connection queue
	queue chan *Request
	// Workers count
	workersCount int32
	// Shutdown handling
	lock    sync.RWMutex
	started bool
	//lock    sync.RWMutex
	//started bool

	sessionUDPMapLock sync.Mutex
	sessionUDPMap     map[string]networkSession

	doneLock sync.Mutex
	done     bool
	doneChan chan struct{}
}

func (srv *Server) workerChannelHandler(inUse bool, timeout *time.Timer) bool {


@@ 216,6 225,7 @@ func (srv *Server) spawnWorker(w *Request) {
// ListenAndServe starts a coapserver on the configured address in *Server.
func (srv *Server) ListenAndServe() error {
	addr := srv.Addr
	var err error
	if addr == "" {
		switch {
		case strings.Contains(srv.Net, "-tls"):


@@ 227,23 237,16 @@ func (srv *Server) ListenAndServe() error {

	switch srv.Net {
	case "tcp", "tcp4", "tcp6":
		a, err := net.ResolveTCPAddr(srv.Net, addr)
		if err != nil {
			return err
		}
		l, err := net.ListenTCP(srv.Net, a)
		srv.Listener, err = kitNet.NewTCPListener(srv.Net, addr, srv.heartBeat())
		if err != nil {
			return err
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		srv.Listener = l
	case "tcp-tls", "tcp4-tls", "tcp6-tls":
		network := strings.TrimSuffix(srv.Net, "-tls")

		l, err := tls.Listen(network, addr, srv.TLSConfig)
		srv.Listener, err = kitNet.NewTLSListener(network, addr, srv.TLSConfig, srv.heartBeat())
		if err != nil {
			return err
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		srv.Listener = l
	case "udp", "udp4", "udp6":
		a, err := net.ResolveUDPAddr(srv.Net, addr)
		if err != nil {


@@ 253,7 256,7 @@ func (srv *Server) ListenAndServe() error {
		if err != nil {
			return err
		}
		if err := setUDPSocketOptions(l); err != nil {
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		srv.Conn = l


@@ 268,17 271,17 @@ func (srv *Server) ListenAndServe() error {
		if err != nil {
			return err
		}
		if err := setUDPSocketOptions(l); err != nil {
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		if len(srv.UDPMcastInterfaces) > 0 {
			for _, ifi := range srv.UDPMcastInterfaces {
				if err := joinGroup(l, &ifi, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
				if err := kitNet.JoinGroup(l, &ifi, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
					return err
				}
			}
		} else {
			if err := joinGroup(l, nil, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
			if err := kitNet.JoinGroup(l, nil, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
				return err
			}
		}


@@ 286,35 289,33 @@ func (srv *Server) ListenAndServe() error {
	default:
		return ErrInvalidNetParameter
	}
	if srv.Conn != nil {
		defer srv.Conn.Close()
	} else if srv.Listener != nil {
		defer srv.Listener.Close()
	}

	return srv.ActivateAndServe()
}

func (srv *Server) initServeUDP(conn *net.UDPConn) error {
	srv.lock.Lock()
	srv.started = true
	srv.lock.Unlock()
	return srv.serveUDP(conn)
	return srv.serveUDP(newShutdownWithContext(srv.doneChan), conn)
}

func (srv *Server) initServeTCP(conn net.Conn) error {
	srv.lock.Lock()
	srv.started = true
	srv.lock.Unlock()
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}
	return srv.serveTCPconnection(conn)
	return srv.serveTCPconnection(newShutdownWithContext(srv.doneChan), conn)
}

// ActivateAndServe starts a coapserver with the PacketConn or Listener
// configured in *Server. Its main use is to start a server from systemd.
func (srv *Server) ActivateAndServe() error {
	srv.lock.Lock()
	if srv.started {
		srv.lock.Unlock()
		return ErrServerAlreadyStarted
	}
	srv.lock.Unlock()
	srv.doneLock.Lock()
	srv.done = false
	srv.doneChan = make(chan struct{})
	srv.doneLock.Unlock()

	pConn := srv.Conn
	l := srv.Listener


@@ 332,7 333,7 @@ func (srv *Server) ActivateAndServe() error {
	defer close(srv.queue)

	if srv.newSessionTCPFunc == nil {
		srv.newSessionTCPFunc = func(connection Conn, srv *Server) (networkSession, error) {
		srv.newSessionTCPFunc = func(connection *kitNet.Conn, srv *Server) (networkSession, error) {
			session, err := newSessionTCP(connection, srv)
			if err != nil {
				return nil, err


@@ 345,7 346,7 @@ func (srv *Server) ActivateAndServe() error {
	}

	if srv.newSessionUDPFunc == nil {
		srv.newSessionUDPFunc = func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (networkSession, error) {
		srv.newSessionUDPFunc = func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err


@@ 375,9 376,6 @@ func (srv *Server) ActivateAndServe() error {
		return ErrInvalidServerConnParameter
	}
	if l != nil {
		srv.lock.Lock()
		srv.started = true
		srv.lock.Unlock()
		return srv.serveTCP(l)
	}



@@ 387,20 385,13 @@ func (srv *Server) ActivateAndServe() error {
// Shutdown shuts down a server. After a call to Shutdown, ListenAndServe and
// ActivateAndServe will return.
func (srv *Server) Shutdown() error {
	srv.lock.Lock()
	if !srv.started {
		srv.lock.Unlock()
		return ErrServerNotStarted
	}
	srv.started = false
	srv.lock.Unlock()

	if srv.Conn != nil {
		srv.Conn.Close()
	}
	if srv.Listener != nil {
		srv.Listener.Close()
	srv.doneLock.Lock()
	defer srv.doneLock.Unlock()
	if srv.done {
		return fmt.Errorf("already shutdowned")
	}
	srv.done = true
	close(srv.doneChan)
	return nil
}



@@ 420,40 411,45 @@ func (srv *Server) writeTimeout() time.Duration {
	return coapTimeout
}

func (srv *Server) syncTimeout() time.Duration {
	if srv.SyncTimeout != 0 {
		return srv.SyncTimeout
// readTimeout is a helper func to use system timeout if server did not intend to change it.
func (srv *Server) heartBeat() time.Duration {
	if srv.HeartBeat != 0 {
		return srv.HeartBeat
	}
	return syncTimeout
	return time.Millisecond * 100
}

func (srv *Server) serveTCPconnection(conn net.Conn) error {
	session, err := srv.newSessionTCPFunc(newConnectionTCP(conn, srv), srv)
func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) error {
	conn := kitNet.NewConn(netConn, srv.heartBeat())

	session, err := srv.newSessionTCPFunc(conn, srv)
	if err != nil {
		return err
	}
	srv.NotifySessionNewFunc(&ClientCommander{session})
	br := srv.acquireReader(conn)
	defer srv.releaseReader(br)

	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		srv.lock.RLock()
		if !srv.started {
			srv.lock.RUnlock()
			return session.Close()
		}
		srv.lock.RUnlock()
		err = conn.SetReadDeadline(time.Now().Add(srv.readTimeout()))
		mti, err := readTcpMsgInfo(ctx, conn)
		if err != nil {
			return session.closeWithError(fmt.Errorf("cannot serve tcp connection: %v", err))
		}
		mti, err := readTcpMsgInfo(br)

		body := make([]byte, mti.BodyLen())
		//ctx, cancel := context.WithTimeout(srv.ctx, srv.readTimeout())
		err = conn.ReadFullWithContext(ctx, body)
		if err != nil {
			return session.closeWithError(err)
			return session.closeWithError(fmt.Errorf("cannot serve tcp connection: %v", err))
		}
		o, p, err := readTcpMsgBody(mti, br)
		//cancel()

		o, p, err := parseTcpOptionsPayload(mti, body)
		if err != nil {
			return session.closeWithError(err)
			return session.closeWithError(fmt.Errorf("cannot serve tcp connection: %v", err))
		}

		msg := new(TcpMessage)
		//msg := TcpMessage{MessageBase{}}



@@ 461,59 457,32 @@ func (srv *Server) serveTCPconnection(conn net.Conn) error {

		// We will block poller wait loop when
		// all pool workers are busy.
		srv.spawnWorker(&Request{Client: &ClientCommander{session}, Msg: msg})
		srv.spawnWorker(&Request{Client: &ClientCommander{session}, Msg: msg, Ctx: sessCtx})
	}
}

// serveTCP starts a TCP listener for the server.
func (srv *Server) serveTCP(l net.Listener) error {
	defer l.Close()

func (srv *Server) serveTCP(l Listener) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

	doneDescChan := make(chan bool)
	numRunningDesc := 0
	var wg sync.WaitGroup
	ctx := newShutdownWithContext(srv.doneChan)

	for {
	LOOP_CLOSE_CHANNEL:
		for {
			select {
			case <-doneDescChan:
				numRunningDesc--
			default:
				break LOOP_CLOSE_CHANNEL
			}
		}

		rw, err := l.Accept()
		srv.lock.RLock()
		if !srv.started {
			srv.lock.RUnlock()
			if rw != nil {
				rw.Close()
			}
			for numRunningDesc > 0 {
				<-doneDescChan
				numRunningDesc--
			}
			return nil
		}
		srv.lock.RUnlock()
		rw, err := l.AcceptWithContext(ctx)
		if err != nil {
			if neterr, ok := err.(net.Error); ok && neterr.Temporary() {
				continue
			}
			return err
			wg.Wait()
			return fmt.Errorf("cannot serve tcp: %v", err)
		}
		if rw != nil {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveTCPconnection(ctx, rw)
			}()
		}

		numRunningDesc++

		go func() {
			srv.serveTCPconnection(rw)
			doneDescChan <- true
		}()
	}
}



@@ 528,39 497,22 @@ func (srv *Server) closeSessions(err error) {
}

// serveUDP starts a UDP listener for the server.
func (srv *Server) serveUDP(conn *net.UDPConn) error {
func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
	defer conn.Close()

	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

	rtimeout := srv.readTimeout()
	// deadline is not used here

	connUDP := newConnectionUDP(conn, srv).(*connUDP)
	connUDP := kitNet.NewConnUDP(conn, srv.heartBeat())
	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		m := make([]byte, ^uint16(0))
		srv.lock.RLock()
		if !srv.started {
			srv.lock.RUnlock()
			srv.closeSessions(nil)
			return nil
		}
		srv.lock.RUnlock()

		err := connUDP.SetReadDeadline(time.Now().Add(rtimeout))
		n, s, err := connUDP.ReadWithContext(ctx, m)
		if err != nil {
			srv.closeSessions(err)
			return err
		}
		m = m[:cap(m)]
		n, s, err := connUDP.ReadFromSessionUDP(m)
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
				continue
			}
			err := fmt.Errorf("cannot serve UDP connection %v", err)
			srv.closeSessions(err)
			return err
		}


@@ 584,13 536,12 @@ func (srv *Server) serveUDP(conn *net.UDPConn) error {
		if err != nil {
			continue
		}
		srv.spawnWorker(&Request{Msg: msg, Client: &ClientCommander{session}})
		srv.spawnWorker(&Request{Msg: msg, Client: &ClientCommander{session}, Ctx: sessCtx})
	}
}

func (srv *Server) serve(r *Request) {
	w := responseWriterFromRequest(r)

	handlePairMsg(w, r, func(w ResponseWriter, r *Request) {
		handleSignalMsg(w, r, func(w ResponseWriter, r *Request) {
			handleBySessionTokenHandler(w, r, func(w ResponseWriter, r *Request) {


@@ 607,39 558,3 @@ func (srv *Server) serveCOAP(w ResponseWriter, r *Request) {
	}
	handler.ServeCOAP(w, r) // Writes back to the client
}

func (srv *Server) acquireReader(tcp net.Conn) *bufio.Reader {
	v := srv.readerPool.Get()
	if v == nil {
		n := srv.TCPReadBufferSize
		if n <= 0 {
			n = defaultReadBufferSize
		}
		return bufio.NewReaderSize(tcp, n)
	}
	r := v.(*bufio.Reader)
	r.Reset(tcp)
	return r
}

func (srv *Server) releaseReader(r *bufio.Reader) {
	srv.readerPool.Put(r)
}

func (srv *Server) acquireWriter(tcp net.Conn) *bufio.Writer {
	v := srv.writerPool.Get()
	if v == nil {
		n := srv.TCPWriteBufferSize
		if n <= 0 {
			n = defaultWriteBufferSize
		}
		return bufio.NewWriterSize(tcp, n)
	}
	wr := v.(*bufio.Writer)
	wr.Reset(tcp)
	return wr
}

func (srv *Server) releaseWriter(wr *bufio.Writer) {
	srv.writerPool.Put(wr)
}

M server_test.go => server_test.go +12 -11
@@ 2,6 2,7 @@ package coap

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/binary"
	"fmt"


@@ 12,6 13,8 @@ import (
	"sync"
	"testing"
	"time"

	kitNet "github.com/go-ocf/kit/net"
)

func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {


@@ 19,7 22,7 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
		resp := &TcpMessage{
			MessageBase{
				//typ:       Acknowledgement, not used by COAP over TCP
				code: Valid,
				code: code,
				//messageID: req.MessageID(), , not used by COAP over TCP
				payload: req.Payload(),
				token:   req.Token(),


@@ 32,7 35,7 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
	resp := &DgramMessage{
		MessageBase{
			typ:       Acknowledgement,
			code:      Valid,
			code:      code,
			messageID: req.MessageID(),
			payload:   req.Payload(),
			token:     req.Token(),


@@ 109,9 112,10 @@ func RunLocalUDPServer(net, laddr string, BlockWiseTransfer bool, BlockWiseTrans
}

func RunLocalServerTCPWithHandler(laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, handler HandlerFunc) (*Server, string, chan error, error) {
	l, err := net.Listen("tcp", laddr)
	network := "tcp"
	l, err := kitNet.NewTCPListener(network, laddr, time.Millisecond*100)
	if err != nil {
		return nil, "", nil, err
		return nil, "", nil, fmt.Errorf("cannot create new tls listener: %v", err)
	}

	server := &Server{Listener: l, ReadTimeout: time.Second * 3600, WriteTimeout: time.Second * 3600,


@@ 147,7 151,7 @@ func RunLocalTCPServer(laddr string, BlockWiseTransfer bool, BlockWiseTransferSz
}

func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan error, error) {
	l, err := tls.Listen("tcp", laddr, config)
	l, err := kitNet.NewTLSListener("tcp", laddr, config, time.Millisecond*100)
	if err != nil {
		return nil, "", nil, err
	}


@@ 159,10 163,6 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 
			fmt.Printf("networkSession end %v: %v\n", w.RemoteAddr(), err)
		}}

	waitLock := sync.Mutex{}
	waitLock.Lock()
	server.NotifyStartedFunc = waitLock.Unlock

	// fin must be buffered so the goroutine below won't block
	// forever if fin is never read from. This always happens
	// in RunLocalUDPServer and can happen in TestShutdownUDP.


@@ 173,7 173,6 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 
		l.Close()
	}()

	waitLock.Lock()
	return server, l.Addr().String(), fin, nil
}



@@ 315,7 314,9 @@ func ChallegingServerTimeout(w ResponseWriter, r *Request) {
		Token:     []byte("abcd"),
	})
	req.SetOption(ContentFormat, TextPlain)
	_, err := r.Client.networkSession.exchangeTimeout(req, time.Second, time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	_, err := r.Client.networkSession.ExchangeWithContext(ctx, req)
	if err == nil {
		panic("Error: expected timeout")
	}

A shutdownContext.go => shutdownContext.go +30 -0
@@ 0,0 1,30 @@
package coap

import (
	"fmt"
	"time"
)

type shutdownContext struct {
	doneChan <-chan struct{}
}

func newShutdownWithContext(doneChan <-chan struct{}) *shutdownContext {
	return &shutdownContext{doneChan: doneChan}
}

func (ctx *shutdownContext) Deadline() (deadline time.Time, ok bool) {
	return time.Time{}, false
}

func (ctx *shutdownContext) Done() <-chan struct{} {
	return ctx.doneChan
}

func (ctx *shutdownContext) Err() error {
	return fmt.Errorf("shutdown")
}

func (ctx *shutdownContext) Value(key interface{}) interface{} {
	return nil
}

D udp.go => udp.go +0 -121
@@ 1,121 0,0 @@
package coap

import (
	"encoding/base64"
	"net"
	"runtime"

	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

// This is the required size of the OOB buffer to pass to ReadMsgUDP.
var udpOOBSize = func() int {
	// We can't know whether we'll get an IPv4 control message or an
	// IPv6 control message ahead of time. To get around this, we size
	// the buffer equal to the largest of the two.

	oob4 := ipv4.NewControlMessage(ipv4.FlagDst | ipv4.FlagInterface)
	oob6 := ipv6.NewControlMessage(ipv6.FlagDst | ipv6.FlagInterface)

	if len(oob4) > len(oob6) {
		return len(oob4)
	}

	return len(oob6)
}()

// SessionUDPData holds the remote address and the associated
// out-of-band data.
type SessionUDPData struct {
	raddr   *net.UDPAddr
	context []byte
}

// RemoteAddr returns the remote network address.
func (s *SessionUDPData) RemoteAddr() net.Addr { return s.raddr }

// Key returns the key session for the map using
func (s *SessionUDPData) Key() string {
	key := s.RemoteAddr().String() + "-" + base64.StdEncoding.EncodeToString(s.context)
	return key
}

// ReadFromSessionUDP acts just like net.UDPConn.ReadFrom(), but returns a session object instead of a
// net.UDPAddr.
func ReadFromSessionUDP(conn *net.UDPConn, b []byte) (int, *SessionUDPData, error) {
	oob := make([]byte, udpOOBSize)
	n, oobn, _, raddr, err := conn.ReadMsgUDP(b, oob)
	if err != nil {
		return n, nil, err
	}
	return n, &SessionUDPData{raddr, oob[:oobn]}, err
}

// WriteToSessionUDP acts just like net.UDPConn.WriteTo(), but uses a *SessionUDP instead of a net.Addr.
func WriteToSessionUDP(conn *net.UDPConn, b []byte, session *SessionUDPData) (int, error) {
	//check if socket is connected via Dial
	if conn.RemoteAddr() == nil {
		return conn.WriteToUDP(b, session.raddr)
	}

	n, _, err := conn.WriteMsgUDP(b, correctSource(session.context), nil)
	return n, err
}

func setUDPSocketOptions(conn *net.UDPConn) error {
	if runtime.GOOS == "windows" {
		return nil
	}
	if ip4 := conn.LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
		return ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
	}
	return ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
}

// parseDstFromOOB takes oob data and returns the destination IP.
func parseDstFromOOB(oob []byte) net.IP {
	// Start with IPv6 and then fallback to IPv4
	// TODO(fastest963): Figure out a way to prefer one or the other. Looking at
	// the lvl of the header for a 0 or 41 isn't cross-platform.
	cm6 := new(ipv6.ControlMessage)
	if cm6.Parse(oob) == nil && cm6.Dst != nil {
		return cm6.Dst
	}
	cm4 := new(ipv4.ControlMessage)
	if cm4.Parse(oob) == nil && cm4.Dst != nil {
		return cm4.Dst
	}
	return nil
}

// correctSource takes oob data and returns new oob data with the Src equal to the Dst
func correctSource(oob []byte) []byte {
	if runtime.GOOS == "windows" {
		return oob
	}
	dst := parseDstFromOOB(oob)
	if dst == nil {
		return nil
	}
	// If the dst is definitely an IPv6, then use ipv6's ControlMessage to
	// respond otherwise use ipv4's because ipv6's marshal ignores ipv4
	// addresses.
	if dst.To4() == nil {
		cm := new(ipv6.ControlMessage)
		cm.Src = dst
		oob = cm.Marshal()
	} else {
		cm := new(ipv4.ControlMessage)
		cm.Src = dst
		oob = cm.Marshal()
	}
	return oob
}

func joinGroup(conn *net.UDPConn, ifi *net.Interface, gaddr *net.UDPAddr) error {
	if ip4 := conn.LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
		return ipv4.NewPacketConn(conn).JoinGroup(ifi, gaddr)
	}
	return ipv6.NewPacketConn(conn).JoinGroup(ifi, gaddr)
}