~fnux/yggdrasil-go-coap

dfd0cb9bf2ea47c12a0f66fa7f73feccc16321a4 — Jozef Kralik 2 years ago 53c32ff
add blockwise: POST, PUT, GET
M README.md => README.md +5 -5
@@ 7,12 7,12 @@
Features supported:
* CoAP over UDP [RFC 7252][coap].
* CoAP over TCP/TLS [RFC 8232][coap-tcp]
* Block-wise transfers in COAP [RFC 7959][coap-block-wise-transfers]
* request multiplexer
* multicast

Not yet implemented:
* CoAP over DTLS
* Block-wise transfers in COAP [RFC 7959][coap-block-wise-transfers]

Fork of https://github.com/dustin/go-coap



@@ 28,7 28,7 @@ Fork of https://github.com/dustin/go-coap
```go
	// Server
	// See /examples/simpler/server/main.go
	func handleA(w coap.Session, req coap.Message) {
	func handleA(w coap.SessionNet, req coap.Message) {
		log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
		if req.IsConfirmable() {
			res := w.NewMessage(coap.MessageParams{


@@ 103,7 103,7 @@ Fork of https://github.com/dustin/go-coap
	// Server
	// See /examples/observe/server/main.go

	func periodicTransmitter(w coap.Session, req coap.Message) {
	func periodicTransmitter(w coap.SessionNet, req coap.Message) {
		subded := time.Now()

		for {


@@ 130,7 130,7 @@ Fork of https://github.com/dustin/go-coap

	func main() {
		log.Fatal(coap.ListenAndServe(":5688", "udp",
			coap.HandlerFunc(func(w coap.Session, req coap.Message) {
			coap.HandlerFunc(func(w coap.SessionNet, req coap.Message) {
				log.Printf("Got message path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
				if req.Code() == coap.GET && req.Option(coap.Observe) != nil {
					value := req.Option(coap.Observe)


@@ 149,7 149,7 @@ Fork of https://github.com/dustin/go-coap
```go
	// Client
	// See /examples/observe/client/main.go
	func observe(s coap.Session, m coap.Message) {
	func observe(s coap.SessionNet, m coap.Message) {
		log.Printf("Got %s", m.Payload())
	}


A blockwise.go => blockwise.go +656 -0
@@ 0,0 1,656 @@
package coap

import (
	"log"
	"time"
)

const (
	maxBlockNumber = uint(1048575)
	debug          = false
)

type BlockSzx uint8

const (
	BlockSzx16   BlockSzx = 0
	BlockSzx32   BlockSzx = 1
	BlockSzx64   BlockSzx = 2
	BlockSzx128  BlockSzx = 3
	BlockSzx256  BlockSzx = 4
	BlockSzx512  BlockSzx = 5
	BlockSzx1024 BlockSzx = 6
	BlockSzxBERT BlockSzx = 7
)

var SZXVal = [8]int{
	0: 16,
	1: 32,
	2: 64,
	3: 128,
	4: 256,
	5: 512,
	6: 1024,
	7: 1024,
}

func MarshalBlockOption(szx BlockSzx, blockNumber uint, moreBlocksFollowing bool) (uint32, error) {
	if blockNumber > maxBlockNumber {
		return 0, ErrBlockNumberExceedLimit
	}
	blockVal := uint32(blockNumber << 4)
	m := uint32(0)
	if moreBlocksFollowing {
		m = 1
	}
	blockVal += m << 3
	blockVal += uint32(szx)
	return blockVal, nil
}

func UnmarshalBlockOption(blockVal uint32) (szx BlockSzx, blockNumber uint, moreBlocksFollowing bool, err error) {
	if blockVal > 0xffffff {
		err = ErrBlockInvalidSize
	}

	szx = BlockSzx(blockVal & 0x7) //masking for the SZX
	if (blockVal & 0x8) != 0 {     //masking for the "M"
		moreBlocksFollowing = true
	}
	blockNumber = uint(blockVal) >> 4 //shifting out the SZX and M vals. leaving the block number behind
	if blockNumber > maxBlockNumber {
		err = ErrBlockNumberExceedLimit
	}
	return
}

func exchangeDrivedByPeer(session SessionNet, req Message) (Message, error) {
	pair := make(chan *Request, 1)
	session.sessionHandler().add(req.Token(), func(w ResponseWriter, r *Request, next HandlerFunc) {
		select {
		case pair <- r:
		default:
			next(w, r)
		}
	})
	defer session.sessionHandler().remove(req.Token())
	err := session.Write(req)
	if err != nil {
		return nil, err
	}
	select {
	case resp := <-pair:
		return resp.Msg, nil
	case <-time.After(session.ReadDeadline()):
		return nil, ErrTimeout
	}
}

type blockWiseSender struct {
	peerDrive    bool
	blockType    OptionID
	expectedCode COAPCode
	origin       Message

	currentNum  uint
	currentSzx  BlockSzx
	currentMore bool
}

func (s *blockWiseSender) coapType() COAPType {
	if s.peerDrive {
		return Acknowledgement
	}
	return Confirmable
}

func (s *blockWiseSender) sizeType() OptionID {
	if s.blockType == Block2 {
		return Size2
	}
	return Size1
}

func newSender(peerDrive bool, blockType OptionID, suggestedSzx BlockSzx, expectedCode COAPCode, origin Message) *blockWiseSender {
	return &blockWiseSender{
		peerDrive:    peerDrive,
		blockType:    blockType,
		currentSzx:   suggestedSzx,
		expectedCode: expectedCode,
		origin:       origin,
	}
}

func (s *blockWiseSender) createReq(b *blockWiseSession) (Message, error) {
	req := b.SessionNet.NewMessage(MessageParams{
		Code:      s.origin.Code(),
		Type:      s.coapType(),
		MessageID: s.origin.MessageID(),
		Token:     s.origin.Token(),
	})

	if !s.peerDrive {
		req.SetMessageID(GenerateMessageID())
	}

	for _, option := range s.origin.AllOptions() {
		req.AddOption(option.ID, option.Value)
	}

	req.SetOption(s.sizeType(), len(s.origin.Payload()))

	if s.origin.Payload() != nil && len(s.origin.Payload()) > b.blockWiseMaxPayloadSize(s.currentSzx) {
		req.SetPayload(s.origin.Payload()[:b.blockWiseMaxPayloadSize(s.currentSzx)])
		s.currentMore = true
	} else {
		req.SetPayload(s.origin.Payload())
	}

	block, err := MarshalBlockOption(s.currentSzx, s.currentNum, s.currentMore)
	if err != nil {
		return nil, err
	}

	req.SetOption(s.blockType, block)
	return req, nil
}

func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, error) {
	var resp Message
	var err error
	if debug {
		log.Printf("sendPayload %p req=%v\n", b, req)
	}
	if s.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req)
	} else {
		resp, err = b.SessionNet.Exchange(req)
	}
	if err != nil {
		return nil, err
	}
	if debug {
		log.Printf("sendPayload %p resp=%v\n", b, resp)
	}
	return resp, nil
}

func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Message) (Message, error) {
	if s.currentMore == false {
		if s.blockType == Block1 /*&& resp.Code() == Changed*/ {
			if respBlock2, ok := resp.Option(Block2).(uint32); ok {
				szx, num, _, err := UnmarshalBlockOption(respBlock2)
				if err != nil {
					return nil, err
				}
				if !b.blockWiseIsValid(szx) {
					return nil, ErrInvalidBlockSzx
				}
				if num == 0 {
					resp.RemoveOption(s.sizeType())
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code(), Confirmable, Changed)
				}
			}
		}
		// clean response from blockWise staff
		if !s.peerDrive {
			resp.SetMessageID(s.origin.MessageID())
		}
		resp.RemoveOption(s.sizeType())
		resp.RemoveOption(s.blockType)
		return resp, nil
	}

	if resp.Code() != s.expectedCode {
		return resp, nil
	}

	if respBlock, ok := resp.Option(s.blockType).(uint32); ok {
		szx, num, _ /*more*/, err := UnmarshalBlockOption(respBlock)
		if err != nil {
			return nil, err
		}
		if !b.blockWiseIsValid(szx) {
			return nil, ErrInvalidBlockSzx
		}
		if s.peerDrive {
			s.currentNum = num
			req.SetMessageID(resp.MessageID())
		} else {
			s.currentNum = calcNextNum(num, szx, len(req.Payload()))
			req.SetMessageID(GenerateMessageID())
		}
		startOffset := calcStartOffset(s.currentNum, szx)
		endOffset := startOffset + b.blockWiseMaxPayloadSize(szx)
		if endOffset > len(s.origin.Payload()) {
			endOffset = len(s.origin.Payload())
			s.currentMore = false
		}
		req.SetPayload(s.origin.Payload()[startOffset:endOffset])
		//must be unique for evey msg via UDP
		if debug {
			log.Printf("sendPayload szx=%v num=%v more=%v\n", s.currentSzx, s.currentNum, s.currentMore)
		}
		block, err := MarshalBlockOption(s.currentSzx, s.currentNum, s.currentMore)
		if err != nil {
			return nil, err
		}
		req.SetOption(s.blockType, block)
	} else {
		switch s.blockType {
		case Block1:
			return nil, ErrInvalidOptionBlock1
		default:
			return nil, ErrInvalidOptionBlock2
		}
	}
	return nil, nil
}

func (b *blockWiseSession) sendPayload(peerDrive bool, blockType OptionID, suggestedSzx BlockSzx, expectedCode COAPCode, msg Message) (Message, error) {
	s := newSender(peerDrive, blockType, suggestedSzx, expectedCode, msg)
	req, err := s.createReq(b)
	if err != nil {
		return nil, err
	}
	for {
		bwResp, err := s.exchange(b, req)
		if err != nil {
			return nil, err
		}

		resp, err := s.processResp(b, req, bwResp)
		if err != nil {
			return nil, err
		}

		if resp != nil {
			return resp, nil
		}
	}
}

type blockWiseSession struct {
	SessionNet
}

func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
	switch msg.Code() {
	case GET:
		return b.receivePayload(false, msg, nil, Block2, msg.Code(), Confirmable, Content)
	case POST, PUT:
		return b.sendPayload(false, Block1, b.SessionNet.blockWiseSzx(), Continue, msg)
	}
	return b.SessionNet.Exchange(msg)
}

func (b *blockWiseSession) Write(msg Message) error {
	switch msg.Code() {
	case GET, POST, PUT:
		_, err := b.Exchange(msg)
		return err
	}
	return b.SessionNet.Write(msg)
}

func calcNextNum(num uint, szx BlockSzx, payloadSize int) uint {
	val := uint(payloadSize / SZXVal[szx])
	if val > 0 && (payloadSize%SZXVal[szx] == 0) {
		val--
	}
	return num + val + 1
}

func calcStartOffset(num uint, szx BlockSzx) int {
	return int(num) * SZXVal[szx]
}

func (b *blockWiseSession) sendErrorMsg(code COAPCode, typ COAPType, token []byte, MessageID uint16) {
	req := b.NewMessage(MessageParams{
		Code:      code,
		Type:      typ,
		MessageID: MessageID,
		Token:     token,
	})
	b.SessionNet.Write(req)
}

type blockWiseReceiver struct {
	peerDrive    bool
	code         COAPCode
	expectedCode COAPCode
	typ          COAPType
	origin       Message
	blockType    OptionID
	currentSzx   BlockSzx
	nextNum      uint
	currentMore  bool
	payloadSize  uint32

	payload []byte
}

func (r *blockWiseReceiver) sizeType() OptionID {
	if r.blockType == Block1 {
		return Size1
	}
	return Size2
}

func (r *blockWiseReceiver) coapType() COAPType {
	if r.peerDrive {
		return Acknowledgement
	}
	return Confirmable
}

func (r *blockWiseReceiver) createReq(b *blockWiseSession, resp Message) (Message, error) {
	req := b.SessionNet.NewMessage(MessageParams{
		Code:      r.code,
		Type:      r.typ,
		MessageID: r.origin.MessageID(),
		Token:     r.origin.Token(),
	})
	if !r.peerDrive {
		for _, option := range r.origin.AllOptions() {
			req.AddOption(option.ID, option.Value)
		}
		req.SetMessageID(GenerateMessageID())
	} else if resp == nil {
		// set blocktype as peer wants
		block := r.origin.Option(r.blockType)
		if block != nil {
			req.SetOption(r.blockType, block)
		}
	}

	if len(r.payload) > 0 {
		block, err := MarshalBlockOption(r.currentSzx, r.nextNum, r.currentMore)
		if err != nil {
			return nil, err
		}
		req.SetOption(r.blockType, block)
	}
	return req, nil
}

func (b *blockWiseSession) receivePayloadInit(peerDrive bool, origin Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (r *blockWiseReceiver, res Message, err error) {
	r = &blockWiseReceiver{
		peerDrive:    peerDrive,
		code:         code,
		expectedCode: expectedCode,
		origin:       origin,
		blockType:    blockType,
		currentSzx:   b.SessionNet.blockWiseSzx(),
		payload:      []byte{},
	}

	if resp != nil {
		var ok bool
		if r.payloadSize, ok = resp.Option(r.sizeType()).(uint32); ok {
			//try to get Size
		}
		if respBlock, ok := resp.Option(blockType).(uint32); ok {
			//contains block
			szx, num, more, err := UnmarshalBlockOption(respBlock)
			if err != nil {
				return r, nil, err
			}
			if !b.blockWiseIsValid(szx) {
				return r, nil, ErrInvalidBlockSzx
			}
			//do we need blockWise?
			if more == false {
				resp.RemoveOption(r.sizeType())
				resp.RemoveOption(blockType)
				if !peerDrive {
					resp.SetMessageID(origin.MessageID())
				}
				return r, resp, nil
			}
			//set szx and num by response
			r.currentSzx = szx
			r.nextNum = calcNextNum(num, r.currentSzx, len(resp.Payload()))
			r.currentMore = more
		} else {
			//it's doesn't contains block
			return r, resp, nil
		}
		//append payload and set block
		r.payload = append(r.payload, resp.Payload()...)
	}

	if peerDrive {
		//we got all message returns it to handler
		if respBlock, ok := origin.Option(blockType).(uint32); ok {
			szx, num, more, err := UnmarshalBlockOption(respBlock)
			if err != nil {
				return r, nil, err
			}
			if !b.blockWiseIsValid(szx) {
				return r, nil, ErrInvalidBlockSzx
			}
			if more == false {
				origin.RemoveOption(blockType)

				return r, origin, nil
			}
			r.currentSzx = szx
			r.nextNum = num
			r.currentMore = more
		}
		r.payload = append(r.payload, origin.Payload()...)
	}

	return r, nil, nil
}

func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message, error) {
	if debug {
		log.Printf("receivePayload %p req=%v\n", b, req)
	}
	var resp Message
	var err error
	if r.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req)
	} else {
		resp, err = b.SessionNet.Exchange(req)
	}

	if debug {
		log.Printf("receivePayload %p resp=%v\n", b, resp)
	}

	return resp, err
}

func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp Message) (Message, error) {
	/*
		if resp.Code() != r.expectedCode {
			return nil, ErrInvalidRequest
		}
	*/

	if respBlock, ok := resp.Option(r.blockType).(uint32); ok {
		szx, num, more, err := UnmarshalBlockOption(respBlock)
		if err != nil {
			return nil, err
		}
		if !b.blockWiseIsValid(szx) {
			return nil, ErrInvalidBlockSzx
		}
		startOffset := calcStartOffset(num, szx)
		if len(r.payload) < startOffset {
			return nil, ErrRequestEntityIncomplete
		}
		if more == true && len(resp.Payload())%SZXVal[szx] != 0 {
			if r.peerDrive {
				return nil, ErrInvalidRequest
			}
			//reagain
			r.nextNum = num
		} else {
			r.payload = append(r.payload[:startOffset], resp.Payload()...)
			if r.peerDrive {
				r.nextNum = num
			} else {
				r.nextNum = calcNextNum(num, szx, len(resp.Payload()))
			}
		}

		if more == false {
			if r.payloadSize != 0 && int(r.payloadSize) != len(r.payload) {
				return nil, ErrInvalidPayloadSize
			}
			resp.SetPayload(r.payload)
			// remove block used by blockWise
			resp.RemoveOption(r.sizeType())
			resp.RemoveOption(r.blockType)
			if !r.peerDrive {
				resp.SetMessageID(r.origin.MessageID())
			}
			return resp, nil
		}
		if r.peerDrive {
			req.SetMessageID(resp.MessageID())
		} else {
			req.SetMessageID(GenerateMessageID())
		}
		if debug {
			log.Printf("receivePayload szx=%v num=%v more=%v\n", szx, r.nextNum, more)
		}
		block, err := MarshalBlockOption(szx, r.nextNum, more)
		if err != nil {
			return nil, err
		}
		req.SetOption(r.blockType, block)
	} else {
		switch r.blockType {
		case Block1:
			return nil, ErrInvalidOptionBlock1
		default:
			return nil, ErrInvalidOptionBlock2
		}
	}
	return nil, nil
}

func (r *blockWiseReceiver) sendError(b *blockWiseSession, code COAPCode, resp Message) {
	var MessageID uint16
	var token []byte
	var typ COAPType
	if !r.peerDrive {
		MessageID = GenerateMessageID()
		token = r.origin.Token()
		typ = NonConfirmable
	} else {
		MessageID = r.origin.MessageID()
		typ = Acknowledgement
		if resp != nil {
			token = resp.Token()
		} else {
			token = r.origin.Token()
		}
	}
	b.sendErrorMsg(code, typ, token, MessageID)
}

func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode, typ COAPType, expectedCode COAPCode) (Message, error) {
	r, resp, err := b.receivePayloadInit(peerDrive, msg, resp, blockType, code, expectedCode)
	if err != nil {
		r.sendError(b, BadRequest, resp)
		return nil, err
	}
	if resp != nil {
		return resp, nil
	}

	req, err := r.createReq(b, resp)
	if err != nil {
		r.sendError(b, BadRequest, resp)
		return nil, err
	}

	for {
		bwResp, err := r.exchange(b, req)

		if err != nil {
			r.sendError(b, BadRequest, resp)
			return nil, err
		}

		resp, err := r.processResp(b, req, bwResp)

		if err != nil {
			errCode := BadRequest
			switch err {
			case ErrRequestEntityIncomplete:
				errCode = RequestEntityIncomplete
			}
			r.sendError(b, errCode, resp)
			return nil, err
		}

		if resp != nil {
			return resp, nil
		}
	}
}

func invalidBlockWise(w ResponseWriter, r *Request, err error) {
	resp := r.SessionNet.NewMessage(MessageParams{
		Code:      BadRequest,
		Type:      Acknowledgement,
		MessageID: r.Msg.MessageID(),
		Token:     r.Msg.Token(),
		Payload:   []byte(err.Error()),
	})
	resp.SetOption(ContentFormat, TextPlain)
	w.Write(resp)
}

func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter, r *Request)) {
	if r.Msg.Token() != nil && (r.Msg.Code() == PUT || r.Msg.Code() == POST) {
		if b, ok := r.SessionNet.(*blockWiseSession); ok {
			msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue, Acknowledgement, r.Msg.Code())

			if err != nil {
				return
			}
			next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
			return
		}
	}
	next(w, r)
}

type blockWiseResponseWriter struct {
	responseWriter
}

func (w *blockWiseResponseWriter) Write(msg Message) error {
	suggestedSzx := w.req.SessionNet.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockSzxBERT && !w.req.SessionNet.IsTCP() {
			return ErrInvalidBlockSzx
		}
		suggestedSzx = szx
	}

	//resp is less them szx then just write msg without blockWise
	if len(msg.Payload()) < SZXVal[suggestedSzx] {
		return w.responseWriter.Write(msg)
	}

	if b, ok := w.req.SessionNet.(*blockWiseSession); ok {
		_, err := b.sendPayload(true, Block2, suggestedSzx, w.req.Msg.Code(), msg)
		return err
	}

	return ErrNotSupported
}

A blockwise_test.go => blockwise_test.go +265 -0
@@ 0,0 1,265 @@
package coap

import (
	"fmt"
	"testing"

	"github.com/ugorji/go/codec"
)

func testMarshal(t *testing.T, szx BlockSzx, blockNumber uint, moreBlocksFollowing bool, expectedBlock uint32) {
	fmt.Printf("testMarshal szx=%v, num=%v more=%v\n", szx, blockNumber, moreBlocksFollowing)
	block, err := MarshalBlockOption(szx, blockNumber, moreBlocksFollowing)
	if err != nil {
		t.Fatalf("unexpected error %v", err)
	}
	if block != expectedBlock {
		t.Fatalf("unexpected value of block %v, expected %v", block, expectedBlock)
	}
}

func testUnmarshal(t *testing.T, block uint32, expectedSzx BlockSzx, expectedNum uint, expectedMoreBlocksFollowing bool) {
	fmt.Printf("testUnmarshal %v\n", block)
	szx, num, more, err := UnmarshalBlockOption(block)
	if err != nil {
		t.Fatalf("unexpected error %v", err)
	}
	if szx != expectedSzx {
		t.Fatalf("unexpected szx of block %v, expected %v", szx, expectedSzx)
	}
	if num != expectedNum {
		t.Fatalf("unexpected num of block %v, expected %v", num, expectedNum)
	}
	if more != expectedMoreBlocksFollowing {
		t.Fatalf("unexpected more of block %v, expected %v", more, expectedMoreBlocksFollowing)
	}
}

func TestBlockWiseBlockMarshal(t *testing.T) {
	testMarshal(t, BlockSzx16, 0, false, uint32(0))
	testMarshal(t, BlockSzx16, 0, true, uint32(8))
	testMarshal(t, BlockSzx32, 0, false, uint32(1))
	testMarshal(t, BlockSzx32, 0, true, uint32(9))
	testMarshal(t, BlockSzx64, 0, false, uint32(2))
	testMarshal(t, BlockSzx64, 0, true, uint32(10))
	testMarshal(t, BlockSzx128, 0, false, uint32(3))
	testMarshal(t, BlockSzx128, 0, true, uint32(11))
	testMarshal(t, BlockSzx256, 0, false, uint32(4))
	testMarshal(t, BlockSzx256, 0, true, uint32(12))
	testMarshal(t, BlockSzx512, 0, false, uint32(5))
	testMarshal(t, BlockSzx512, 0, true, uint32(13))
	testMarshal(t, BlockSzx1024, 0, false, uint32(6))
	testMarshal(t, BlockSzx1024, 0, true, uint32(14))
	testMarshal(t, BlockSzxBERT, 0, false, uint32(7))
	testMarshal(t, BlockSzxBERT, 0, true, uint32(15))

	val, err := MarshalBlockOption(BlockSzx16, maxBlockNumber+1, false)
	if err == nil {
		t.Fatalf("expected error, block %v", val)
	}
}

func TestBlockWiseBlockUnmarshal(t *testing.T) {
	testUnmarshal(t, uint32(0), BlockSzx16, 0, false)
	testUnmarshal(t, uint32(8), BlockSzx16, 0, true)
	testUnmarshal(t, uint32(1), BlockSzx32, 0, false)
	testUnmarshal(t, uint32(9), BlockSzx32, 0, true)
	testUnmarshal(t, uint32(2), BlockSzx64, 0, false)
	testUnmarshal(t, uint32(10), BlockSzx64, 0, true)
	testUnmarshal(t, uint32(3), BlockSzx128, 0, false)
	testUnmarshal(t, uint32(11), BlockSzx128, 0, true)
	testUnmarshal(t, uint32(4), BlockSzx256, 0, false)
	testUnmarshal(t, uint32(12), BlockSzx256, 0, true)
	testUnmarshal(t, uint32(5), BlockSzx512, 0, false)
	testUnmarshal(t, uint32(13), BlockSzx512, 0, true)
	testUnmarshal(t, uint32(6), BlockSzx1024, 0, false)
	testUnmarshal(t, uint32(14), BlockSzx1024, 0, true)
	testUnmarshal(t, uint32(7), BlockSzxBERT, 0, false)
	testUnmarshal(t, uint32(15), BlockSzxBERT, 0, true)
	szx, num, m, err := UnmarshalBlockOption(0x1000000)
	if err == nil {
		t.Fatalf("expected error, szx %v, num %v, m %v", szx, num, m)
	}
}

var path = "/allAttributeTypes"
var server = "127.0.0.1:33643"

func decodeMsg(resp Message) {
	var m map[string]interface{}
	fmt.Printf("--------------------------------------\n")
	fmt.Printf("Path: %v\n", resp.PathString())
	fmt.Printf("Payload: %x\n", resp.Payload())
	err := codec.NewDecoderBytes(resp.Payload(), new(codec.CborHandle)).Decode(&m)
	if err != nil {
		fmt.Printf("cannot decode payload: %v!!!\n", err)

	} else {
		fmt.Printf("decoded type : %T\n", m)
		fmt.Printf("decoded value: %v\n", m)

		if val, ok := m["binaryAttribute"]; ok {
			fmt.Printf("binaryAttribute: %x\n", val)
		}
	}
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("RemoteAddr : %v\n", req.SessionNet.RemoteAddr())
	decodeMsg(req.Msg)
}

func TestServingUDPBlockSzx16(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx16, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx32(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx32, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx64(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx64, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx128(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx128, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx256(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx256, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx512(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx512, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzx1024(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx1024, make([]byte, 128), simpleMsg)
}

func TestServingUDPBlockSzxBERT(t *testing.T) {
	_, addr, _, err := RunLocalUDPServer("udp", ":0", true, BlockSzx1024)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzxBERT
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	_, err = c.Dial(addr)
	if err != nil {
		if err.Error() != ErrInvalidBlockSzx.Error() {
			t.Fatalf("Expected error '%v', got '%v'", err, ErrInvalidBlockSzx)
		}
	} else {
		t.Fatalf("Expected error '%v'", ErrInvalidBlockSzx)
	}
}

func TestServingTCPBlockSzx16(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx16, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx32(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx32, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx64(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx64, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx128(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx128, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx256(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx256, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx512(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx512, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzx1024(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx1024, make([]byte, 128), simpleMsg)
}

func TestServingTCPBlockSzxBERT(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzxBERT, make([]byte, 128), simpleMsg)
}

func TestServingTCPBigMsgBlockSzx1024(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzx1024, make([]byte, 10*1024*1024), simpleMsg)
}

func TestServingTCPBigMsgBlockSzxBERT(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzxBERT, make([]byte, 10*1024*1024), simpleMsg)
}

/*
func TestBlockWisePostBlock16(t *testing.T) {
	client := &Client{Net: "udp", ObserverFunc: observe}
	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}

	fmt.Fprintf(os.Stdout, "conn: %v\n", co.LocalAddr())

	payload := map[string]interface{}{
		"binaryAttribute": make([]byte, 17),
	}
	bw := new(bytes.Buffer)
	h := new(codec.CborHandle)
	enc := codec.NewEncoder(bw, h)
	err = enc.Encode(&payload)
	if err != nil {
		t.Fatalf("Cannot encode: %v", err)
	}

	token, err := GenerateToken(8)

	req := co.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      POST,
		MessageID: GenerateMessageID(),
		Token:     token,
		Payload:   bw.Bytes(),
	})

	req.SetPathString(path)
	req.SetOption(ContentFormat, AppCBOR)

	resp, err := co.Exchange(req)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

func TestBlockWiseGetBlock16(t *testing.T) {
	client := &Client{Net: "udp", ObserverFunc: observe}

	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}

	token, err := GenerateToken(8)

	req := co.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})

	req.SetPathString(path)

	resp, err := co.Exchange(req)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}
*/

M client.go => client.go +77 -18
@@ 14,7 14,7 @@ import (
type ClientConn struct {
	srv          *Server
	client       *Client
	session      Session
	session      SessionNet
	shutdownSync chan error
}



@@ 30,6 30,9 @@ type Client struct {

	ObserverFunc         HandlerFunc     // for handling observation messages from server
	NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.

	BlockWiseTransfer    *bool     // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
	BlockWiseTransferSzx *BlockSzx // Set maximal block size of payload that will be send in fragment
}

func (c *Client) readTimeout() time.Duration {


@@ 61,6 64,8 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	var sessionUPDData *SessionUDPData

	dialer := &net.Dialer{Timeout: c.DialTimeout}
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockSzx1024

	switch c.Net {
	case "tcp-tls", "tcp4-tls", "tcp6-tls":


@@ 69,12 74,14 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		if err != nil {
			return nil, err
		}
		BlockWiseTransferSzx = BlockSzxBERT
	case "tcp", "tcp4", "tcp6":
		network = c.Net
		conn, err = dialer.Dial(c.Net, address)
		if err != nil {
			return nil, err
		}
		BlockWiseTransferSzx = BlockSzxBERT
	case "udp", "udp4", "udp6", "":
		network = c.Net
		if network == "" {


@@ 84,6 91,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
			return nil, err
		}
		sessionUPDData = &SessionUDPData{raddr: conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr)}
		BlockWiseTransfer = true
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		network = strings.TrimSuffix(c.Net, "-mcast")
		var a *net.UDPAddr


@@ 99,12 107,29 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		}
		sessionUPDData = &SessionUDPData{raddr: a}
		conn = udpConn
		BlockWiseTransfer = true
	default:
		return nil, ErrInvalidNetParameter
	}

	if c.BlockWiseTransfer != nil {
		BlockWiseTransfer = *c.BlockWiseTransfer
	}

	if c.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *c.BlockWiseTransferSzx
	}

	sync := make(chan bool)
	clientConn = &ClientConn{srv: &Server{Net: network, TLSConfig: c.TLSConfig, Conn: conn, ReadTimeout: c.readTimeout(), WriteTimeout: c.writeTimeout(), MaxMessageSize: c.MaxMessageSize,
	clientConn = &ClientConn{srv: &Server{
		Net:                  network,
		TLSConfig:            c.TLSConfig,
		Conn:                 conn,
		ReadTimeout:          c.readTimeout(),
		WriteTimeout:         c.writeTimeout(),
		MaxMessageSize:       c.MaxMessageSize,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		NotifyStartedFunc: func() {
			timeout := c.syncTimeout()
			select {


@@ 113,38 138,62 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
				log.Fatal("Client cannot send start: Timeout")
			}
		},
		NotifySessionEndFunc: func(s Session, err error) {
		NotifySessionEndFunc: func(s SessionNet, err error) {
			if c.NotifySessionEndFunc != nil {
				c.NotifySessionEndFunc(err)
			}
		},
		createSessionTCPFunc: func(connection Conn, srv *Server) (Session, error) {
		createSessionTCPFunc: func(connection Conn, srv *Server) (SessionNet, error) {
			return clientConn.session, nil
		},
		createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (Session, error) {
		createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
			if sessionUDPData.RemoteAddr().String() == clientConn.session.RemoteAddr().String() {
				clientConn.session.(*sessionUDP).sessionUDPData = sessionUDPData
				if s, ok := clientConn.session.(*blockWiseSession); ok {
					s.SessionNet.(*sessionUDP).sessionUDPData = sessionUDPData
				} else {
					clientConn.session.(*sessionUDP).sessionUDPData = sessionUDPData
				}
				return clientConn.session, nil
			}
			return newSessionUDP(connection, srv, sessionUDPData)
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
			}
			return session, nil
		}, Handler: c.ObserverFunc},
		shutdownSync: make(chan error)}

	switch clientConn.srv.Conn.(type) {
	case *net.TCPConn, *tls.Conn:
		clientConn.session, err = newSessionTCP(newConnectionTCP(clientConn.srv.Conn, clientConn.srv), clientConn.srv)
		session, err := newSessionTCP(newConnectionTCP(clientConn.srv.Conn, clientConn.srv), clientConn.srv)
		if err != nil {
			return nil, err
		}
		if session.blockWiseEnabled() {
			clientConn.session = &blockWiseSession{SessionNet: session}
		} else {
			clientConn.session = session
		}
	case *net.UDPConn:
		// WriteMsgUDP returns error when addr is filled in SessionUDPData for connected socket
		setUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		clientConn.session, err = newSessionUDP(newConnectionUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv), clientConn.srv, sessionUPDData)
		session, err := newSessionUDP(newConnectionUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv), clientConn.srv, sessionUPDData)
		if err != nil {
			return nil, err
		}
		if session.blockWiseEnabled() {
			clientConn.session = &blockWiseSession{SessionNet: session}
		} else {
			clientConn.session = session
		}
	}

	clientConn.session.SetReadDeadline(c.readTimeout())
	clientConn.session.SetWriteDeadline(c.writeTimeout())

	go func() {
		timeout := c.syncTimeout()
		err := clientConn.srv.ActivateAndServe()


@@ 166,12 215,12 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	return clientConn, nil
}

// LocalAddr implements the Session.LocalAddr method.
// LocalAddr implements the SessionNet.LocalAddr method.
func (co *ClientConn) LocalAddr() net.Addr {
	return co.session.LocalAddr()
}

// RemoteAddr implements the Session.RemoteAddr method.
// RemoteAddr implements the SessionNet.RemoteAddr method.
func (co *ClientConn) RemoteAddr() net.Addr {
	return co.session.RemoteAddr()
}


@@ 183,8 232,8 @@ func (co *ClientConn) RemoteAddr() net.Addr {
// case of truncation.
// To specify a local address or a timeout, the caller has to set the `Client.Dialer`
// attribute appropriately
func (co *ClientConn) Exchange(m Message, timeout time.Duration) (r Message, err error) {
	return co.session.Exchange(m, timeout)
func (co *ClientConn) Exchange(m Message) (Message, error) {
	return co.session.Exchange(m)
}

// NewMessage Create message for request


@@ 193,12 242,22 @@ func (co *ClientConn) NewMessage(p MessageParams) Message {
}

// WriteMsg sends a message through the connection co.
func (co *ClientConn) WriteMsg(m Message, timeout time.Duration) (err error) {
	return co.session.WriteMsg(m, timeout)
func (co *ClientConn) Write(m Message) error {
	return co.session.Write(m)
}

// SetReadDeadline set read deadline for timeout for Exchange
func (co *ClientConn) SetReadDeadline(timeout time.Duration) {
	co.session.SetReadDeadline(timeout)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (co *ClientConn) SetWriteDeadline(timeout time.Duration) {
	co.session.SetWriteDeadline(timeout)
}

// Ping send a ping message and wait for a pong response
func (co *ClientConn) Ping(timeout time.Duration) (err error) {
func (co *ClientConn) Ping(timeout time.Duration) error {
	return co.session.Ping(timeout)
}



@@ 213,13 272,13 @@ func (co *ClientConn) Close() {
}

// Dial connects to the address on the named network.
func Dial(network, address string) (conn *ClientConn, err error) {
func Dial(network, address string) (*ClientConn, error) {
	client := Client{Net: network}
	return client.Dial(address)
}

// DialTimeout acts like Dial but takes a timeout.
func DialTimeout(network, address string, timeout time.Duration) (conn *ClientConn, err error) {
func DialTimeout(network, address string, timeout time.Duration) (*ClientConn, error) {
	client := Client{Net: network, DialTimeout: timeout}
	return client.Dial(address)
}

M client_test.go => client_test.go +39 -29
@@ 13,21 13,21 @@ import (
	"golang.org/x/net/ipv6"
)

func periodicTransmitter(w Session, req Message) {
func periodicTransmitter(w ResponseWriter, r *Request) {
	subded := time.Now()

	for {
		msg := w.NewMessage(MessageParams{
		msg := r.SessionNet.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: req.MessageID(),
			MessageID: r.Msg.MessageID(),
			Payload:   []byte(fmt.Sprintf("Been running for %v", time.Since(subded))),
		})

		msg.SetOption(ContentFormat, TextPlain)
		msg.SetOption(LocationPath, req.Path())
		msg.SetOption(LocationPath, r.Msg.Path())

		err := w.WriteMsg(msg, coapTimeout)
		err := w.Write(msg)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return


@@ 37,13 37,18 @@ func periodicTransmitter(w Session, req Message) {
	}
}

func testServingObservation(t *testing.T, net string, addrstr string) {
func testServingObservation(t *testing.T, net string, addrstr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) {
	sync := make(chan bool)

	client := &Client{ObserverFunc: func(s Session, m Message) {
		log.Printf("Gotaaa %s", m.Payload())
		sync <- true
	}, Net: net}
	client := &Client{
		ObserverFunc: func(w ResponseWriter, r *Request) {
			log.Printf("Gotaaa %s", r.Msg.Payload())
			sync <- true
		},
		Net:                  net,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}

	conn, err := client.Dial(addrstr)
	if err != nil {


@@ 61,7 66,7 @@ func testServingObservation(t *testing.T, net string, addrstr string) {
	req.AddOption(Observe, 1)
	req.SetPathString("/some/path")

	err = conn.WriteMsg(req, coapTimeout)
	err = conn.Write(req)
	if err != nil {
		t.Fatalf("Error sending request: %v", err)
	}


@@ 71,7 76,7 @@ func testServingObservation(t *testing.T, net string, addrstr string) {
}

func TestServingUDPObservation(t *testing.T) {
	s, addrstr, fin, err := RunLocalServerUDPWithHandler("udp", ":0", periodicTransmitter)
	s, addrstr, fin, err := RunLocalServerUDPWithHandler("udp", ":0", false, BlockSzx16, periodicTransmitter)
	if err != nil {
		t.Fatalf("unable to run test server: %v", err)
	}


@@ 79,11 84,11 @@ func TestServingUDPObservation(t *testing.T) {
		s.Shutdown()
		<-fin
	}()
	testServingObservation(t, "udp", addrstr)
	testServingObservation(t, "udp", addrstr, false, BlockSzx16)
}

func TestServingTCPObservation(t *testing.T) {
	s, addrstr, fin, err := RunLocalServerTCPWithHandler(":0", periodicTransmitter)
	s, addrstr, fin, err := RunLocalServerTCPWithHandler(":0", false, BlockSzx16, periodicTransmitter)
	if err != nil {
		t.Fatalf("unable to run test server: %v", err)
	}


@@ 91,22 96,27 @@ func TestServingTCPObservation(t *testing.T) {
		s.Shutdown()
		<-fin
	}()
	testServingObservation(t, "tcp", addrstr)
	testServingObservation(t, "tcp", addrstr, false, BlockSzx16)
}

func testServingMCastByClient(t *testing.T, lnet, laddr string, ifis []net.Interface) {
func testServingMCastByClient(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, ifis []net.Interface) {
	payload := []byte("mcast payload")
	addrMcast := laddr
	ansArrived := make(chan bool)

	c := Client{Net: lnet, ObserverFunc: func(s Session, m Message) {
		if bytes.Equal(m.Payload(), payload) {
			log.Printf("mcast %v -> %v", s.RemoteAddr(), s.LocalAddr())
			ansArrived <- true
		} else {
			t.Fatalf("unknown payload %v arrived from %v", m.Payload(), s.RemoteAddr())
		}
	}}
	c := Client{
		Net: lnet,
		ObserverFunc: func(w ResponseWriter, r *Request) {
			if bytes.Equal(r.Msg.Payload(), payload) {
				log.Printf("mcast %v -> %v", r.SessionNet.RemoteAddr(), r.SessionNet.LocalAddr())
				ansArrived <- true
			} else {
				t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.SessionNet.RemoteAddr())
			}
		},
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}
	var a *net.UDPAddr
	var err error
	if a, err = net.ResolveUDPAddr(strings.TrimSuffix(lnet, "-mcast"), addrMcast); err != nil {


@@ 140,17 150,17 @@ func testServingMCastByClient(t *testing.T, lnet, laddr string, ifis []net.Inter
	req.SetOption(ContentFormat, TextPlain)
	req.SetPathString("/test")

	co.WriteMsg(req, time.Second)
	co.Write(req)

	<-ansArrived
}

func TestServingIPv4MCastByClient(t *testing.T) {
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", []net.Interface{})
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", false, BlockSzx16, []net.Interface{})
}

func TestServingIPv6MCastByClient(t *testing.T) {
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", []net.Interface{})
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16, []net.Interface{})
}

func TestServingIPv4AllInterfacesMCastByClient(t *testing.T) {


@@ 158,7 168,7 @@ func TestServingIPv4AllInterfacesMCastByClient(t *testing.T) {
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", ifis)
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", false, BlockSzx16, ifis)
}

func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {


@@ 166,5 176,5 @@ func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", ifis)
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16, ifis)
}

M error.go => error.go +20 -2
@@ 17,8 17,8 @@ const ErrConnectionClosed = Error("connection closed")
// ErrTokenAlreadyExist Token in request is not unique for session
const ErrTokenAlreadyExist = Error("token is not unique for session")

// ErrTokenNotSet Token in request is not set
const ErrTokenNotSet = Error("token in request is not set")
// ErrTokenNotExist Token in request is not exist
const ErrTokenNotExist = Error("token is not exist")

// ErrInvalidTokenLen invalid token length in Message
const ErrInvalidTokenLen = Error("invalid token length")


@@ 64,3 64,21 @@ const ErrInvalidResponse = Error("invalid response")

// ErrNotSupported invalid response received for certain token
const ErrNotSupported = Error("not supported")

const ErrBlockNumberExceedLimit = Error("block number exceed limit 1,048,576")

const ErrBlockInvalidSize = Error("block has invalid size")

const ErrInvalidOptionBlock2 = Error("message has invalid value of Block2")

const ErrInvalidOptionBlock1 = Error("message has invalid value of Block1")

const ErrInvalidReponseCode = Error("response code has invalid value")

const ErrInvalidPayloadSize = Error("invalid payload size")

const ErrInvalidBlockSzx = Error("invalid block-wise transfer szx")

const ErrRequestEntityIncomplete = Error("payload comes in bad order")

const ErrInvalidRequest = Error("invalid request")

M examples/mcast/client/main.go => examples/mcast/client/main.go +1 -1
@@ 7,7 7,7 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func mcastResp(s coap.Session, m coap.Message) {
func mcastResp(s coap.SessionNet, m coap.Message) {
	log.Printf("Got message: %#v from %v", m, s.RemoteAddr())
}


M examples/mcast/server/main.go => examples/mcast/server/main.go +1 -1
@@ 7,7 7,7 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func handleMcast(w coap.Session, req coap.Message) {
func handleMcast(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	res := w.NewMessage(coap.MessageParams{
		Type:      coap.Acknowledgement,

M examples/observe/client/main.go => examples/observe/client/main.go +1 -1
@@ 7,7 7,7 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func observe(s coap.Session, m coap.Message) {
func observe(s coap.SessionNet, m coap.Message) {
	log.Printf("Got %s", m.Payload())
}


M examples/observe/server/main.go => examples/observe/server/main.go +2 -2
@@ 8,7 8,7 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func periodicTransmitter(w coap.Session, req coap.Message) {
func periodicTransmitter(w coap.SessionNet, req coap.Message) {
	subded := time.Now()

	for {


@@ 36,7 36,7 @@ func periodicTransmitter(w coap.Session, req coap.Message) {

func main() {
	log.Fatal(coap.ListenAndServe(":5688", "udp",
		coap.HandlerFunc(func(w coap.Session, req coap.Message) {
		coap.HandlerFunc(func(w coap.SessionNet, req coap.Message) {
			log.Printf("Got message path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
			if req.Code() == coap.GET && req.Option(coap.Observe) != nil {
				value := req.Option(coap.Observe)

M examples/simple/server/main.go => examples/simple/server/main.go +2 -2
@@ 7,7 7,7 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func handleA(w coap.Session, req coap.Message) {
func handleA(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	if req.IsConfirmable() {
		res := w.NewMessage(coap.MessageParams{


@@ 24,7 24,7 @@ func handleA(w coap.Session, req coap.Message) {
	}
}

func handleB(w coap.Session, req coap.Message) {
func handleB(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	if req.IsConfirmable() {
		res := w.NewMessage(coap.MessageParams{

M message.go => message.go +25 -22
@@ 58,28 58,30 @@ const (

// Response Codes
const (
	Empty                 COAPCode = 0
	Created               COAPCode = 65
	Deleted               COAPCode = 66
	Valid                 COAPCode = 67
	Changed               COAPCode = 68
	Content               COAPCode = 69
	BadRequest            COAPCode = 128
	Unauthorized          COAPCode = 129
	BadOption             COAPCode = 130
	Forbidden             COAPCode = 131
	NotFound              COAPCode = 132
	MethodNotAllowed      COAPCode = 133
	NotAcceptable         COAPCode = 134
	PreconditionFailed    COAPCode = 140
	RequestEntityTooLarge COAPCode = 141
	UnsupportedMediaType  COAPCode = 143
	InternalServerError   COAPCode = 160
	NotImplemented        COAPCode = 161
	BadGateway            COAPCode = 162
	ServiceUnavailable    COAPCode = 163
	GatewayTimeout        COAPCode = 164
	ProxyingNotSupported  COAPCode = 165
	Empty                   COAPCode = 0
	Created                 COAPCode = 65
	Deleted                 COAPCode = 66
	Valid                   COAPCode = 67
	Changed                 COAPCode = 68
	Content                 COAPCode = 69
	Continue                COAPCode = 95
	BadRequest              COAPCode = 128
	Unauthorized            COAPCode = 129
	BadOption               COAPCode = 130
	Forbidden               COAPCode = 131
	NotFound                COAPCode = 132
	MethodNotAllowed        COAPCode = 133
	NotAcceptable           COAPCode = 134
	RequestEntityIncomplete COAPCode = 136
	PreconditionFailed      COAPCode = 140
	RequestEntityTooLarge   COAPCode = 141
	UnsupportedMediaType    COAPCode = 143
	InternalServerError     COAPCode = 160
	NotImplemented          COAPCode = 161
	BadGateway              COAPCode = 162
	ServiceUnavailable      COAPCode = 163
	GatewayTimeout          COAPCode = 164
	ProxyingNotSupported    COAPCode = 165
)

//Signaling Codes for TCP


@@ 396,6 398,7 @@ type Message interface {
	MarshalBinary() ([]byte, error)
	UnmarshalBinary(data []byte) error
	SetToken(t []byte)
	SetMessageID(messageID uint16)
}

// MessageParams params to create COAP message

M messagedgram.go => messagedgram.go +5 -0
@@ 23,6 23,11 @@ func NewDgramMessage(p MessageParams) *DgramMessage {
	}
}

// SetMessageID
func (m *DgramMessage) SetMessageID(messageID uint16) {
	m.messageID = messageID
}

// MarshalBinary produces the binary form of this DgramMessage.
func (m *DgramMessage) MarshalBinary() ([]byte, error) {
	tmpbuf := []byte{0, 0}

M messagetcp.go => messagetcp.go +10 -5
@@ 100,15 100,20 @@ type TcpMessage struct {
func NewTcpMessage(p MessageParams) *TcpMessage {
	return &TcpMessage{
		MessageBase{
			typ:       p.Type,
			code:      p.Code,
			messageID: p.MessageID,
			token:     p.Token,
			payload:   p.Payload,
			//typ:       p.Type, not used by COAP over TCP
			code: p.Code,
			//messageID: p.MessageID,  not used by COAP over TCP
			token:   p.Token,
			payload: p.Payload,
		},
	}
}

// SetMessageID
func (m *TcpMessage) SetMessageID(messageID uint16) {
	//not used by COAP over TCP
}

func (m *TcpMessage) MarshalBinary() ([]byte, error) {
	/*
	   A CoAP TCP message looks like:

A request.go => request.go +6 -0
@@ 0,0 1,6 @@
package coap

type Request struct {
	Msg        Message
	SessionNet SessionNet
}

A responsewriter.go => responsewriter.go +19 -0
@@ 0,0 1,19 @@
package coap

//A ResponseWriter interface is used by an CAOP handler to construct an COAP response.
//A ResponseWriter may not be used after the Handler.ServeCOAP method has returned.
type ResponseWriter interface {
	Write(Message) error
}

type responseWriter struct {
	req *Request
}

func (r *responseWriter) Write(msg Message) error {
	switch msg.Code() {
	case GET, POST, PUT, DELETE:
		return ErrInvalidReponseCode
	}
	return r.req.SessionNet.Write(msg)
}

M server.go => server.go +62 -40
@@ 33,35 33,30 @@ const (

// Handler is implemented by any value that implements ServeCOAP.
type Handler interface {
	ServeCOAP(w Session, r Message)
}

type requestCtx struct {
	request Message
	session Session
	ServeCOAP(w ResponseWriter, r *Request)
}

// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as COAP handlers.  If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler object that calls f.
type HandlerFunc func(Session, Message)
type HandlerFunc func(ResponseWriter, *Request)

// ServeCOAP calls f(w, r).
func (f HandlerFunc) ServeCOAP(w Session, r Message) {
func (f HandlerFunc) ServeCOAP(w ResponseWriter, r *Request) {
	f(w, r)
}

// HandleFailed returns a HandlerFunc that returns NotFound for every request it gets.
func HandleFailed(w Session, req Message) {
func HandleFailed(w ResponseWriter, req *Request) {
	typ := Reset
	msg := w.NewMessage(MessageParams{
	msg := req.SessionNet.NewMessage(MessageParams{
		Type:      typ,
		Code:      NotFound,
		MessageID: req.MessageID(),
		Token:     req.Token(),
		MessageID: req.Msg.MessageID(),
		Token:     req.Msg.Token(),
	})
	w.WriteMsg(msg, coapTimeout)
	w.Write(msg)
}

func failedHandler() Handler { return HandlerFunc(HandleFailed) }


@@ 130,15 125,19 @@ type Server struct {
	// The maximum of time for synchronization go-routines, defaults to 30 seconds, if it occurs, then it call log.Fatal
	SyncTimeout time.Duration
	// If createSessionUDPFunc is set it is called when session UDP want to be created
	createSessionUDPFunc func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (Session, error)
	createSessionUDPFunc func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error)
	// If createSessionUDPFunc is set it is called when session TCP want to be created
	createSessionTCPFunc func(connection Conn, srv *Server) (Session, error)
	createSessionTCPFunc func(connection Conn, srv *Server) (SessionNet, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w Session)
	NotifySessionNewFunc func(w SessionNet)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.
	NotifySessionEndFunc func(w Session, err error)
	NotifySessionEndFunc func(w SessionNet, err error)
	// The interfaces that will be used for udp-mcast (default uses the system assigned for multicast)
	UDPMcastInterfaces []net.Interface
	// Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
	BlockWiseTransfer *bool
	// Set maximal block size of payload that will be send in fragment
	BlockWiseTransferSzx *BlockSzx

	TCPReadBufferSize  int
	TCPWriteBufferSize int


@@ 146,7 145,7 @@ type Server struct {
	readerPool sync.Pool
	writerPool sync.Pool
	// UDP packet or TCP connection queue
	queue chan *requestCtx
	queue chan *Request
	// Workers count
	workersCount int32
	// Shutdown handling


@@ 154,7 153,7 @@ type Server struct {
	started bool

	sessionUDPMapLock sync.Mutex
	sessionUDPMap     map[string]Session
	sessionUDPMap     map[string]SessionNet
}

func (srv *Server) workerChannelHandler(inUse bool, timeout *time.Timer) bool {


@@ 175,7 174,7 @@ func (srv *Server) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
	return true
}

func (srv *Server) worker(w *requestCtx) {
func (srv *Server) worker(w *Request) {
	srv.serve(w)

	for {


@@ 197,7 196,7 @@ func (srv *Server) worker(w *requestCtx) {
	}
}

func (srv *Server) spawnWorker(w *requestCtx) {
func (srv *Server) spawnWorker(w *Request) {
	select {
	case srv.queue <- w:
	default:


@@ 309,25 308,43 @@ func (srv *Server) ActivateAndServe() error {
	pConn := srv.Conn
	l := srv.Listener

	srv.sessionUDPMap = make(map[string]Session)
	srv.sessionUDPMap = make(map[string]SessionNet)

	srv.queue = make(chan *requestCtx)
	srv.queue = make(chan *Request)
	defer close(srv.queue)

	if srv.createSessionTCPFunc == nil {
		srv.createSessionTCPFunc = newSessionTCP
		srv.createSessionTCPFunc = func(connection Conn, srv *Server) (SessionNet, error) {
			session, err := newSessionTCP(connection, srv)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
			}
			return session, nil
		}
	}

	if srv.createSessionUDPFunc == nil {
		srv.createSessionUDPFunc = newSessionUDP
		srv.createSessionUDPFunc = func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
			}
			return session, nil
		}
	}

	if srv.NotifySessionNewFunc == nil {
		srv.NotifySessionNewFunc = func(w Session) {}
		srv.NotifySessionNewFunc = func(w SessionNet) {}
	}

	if srv.NotifySessionEndFunc == nil {
		srv.NotifySessionEndFunc = func(w Session, err error) {}
		srv.NotifySessionEndFunc = func(w SessionNet, err error) {}
	}

	if pConn != nil {


@@ 423,7 440,7 @@ func (srv *Server) serveTCPconnection(conn net.Conn) error {

		// We will block poller wait loop when
		// all pool workers are busy.
		srv.spawnWorker(&requestCtx{session: session, request: msg})
		srv.spawnWorker(&Request{SessionNet: session, Msg: msg})
	}
}



@@ 482,7 499,7 @@ func (srv *Server) serveTCP(l net.Listener) error {
func (srv *Server) closeSessions(err error) {
	srv.sessionUDPMapLock.Lock()
	tmp := srv.sessionUDPMap
	srv.sessionUDPMap = make(map[string]Session)
	srv.sessionUDPMap = make(map[string]SessionNet)
	srv.sessionUDPMapLock.Unlock()
	for _, v := range tmp {
		srv.NotifySessionEndFunc(v, err)


@@ 545,28 562,33 @@ func (srv *Server) serveUDP(conn *net.UDPConn) error {
		if err != nil {
			continue
		}
		srv.spawnWorker(&requestCtx{request: msg, session: session})
		srv.spawnWorker(&Request{Msg: msg, SessionNet: session})
	}
}

func (srv *Server) serve(w *requestCtx) {
	if w.session.handlePairMsg(w.request) {
		return
func (srv *Server) serve(r *Request) {
	var w ResponseWriter
	if r.SessionNet.blockWiseEnabled() {
		w = &blockWiseResponseWriter{responseWriter{req: r}}
	} else {
		w = &responseWriter{req: r}
	}

	if w.session.handleSignals(w.request) {
		return
	}

	srv.serveCOAP(w)
	handlePairMsg(w, r, func(w ResponseWriter, r *Request) {
		handleSignalMsg(w, r, func(w ResponseWriter, r *Request) {
			handleBySessionHandler(w, r, func(w ResponseWriter, r *Request) {
				handleBlockWiseMsg(w, r, srv.serveCOAP)
			})
		})
	})
}

func (srv *Server) serveCOAP(w *requestCtx) {
func (srv *Server) serveCOAP(w ResponseWriter, r *Request) {
	handler := srv.Handler
	if handler == nil || reflect.ValueOf(handler).IsNil() {
		handler = DefaultServeMux
	}
	handler.ServeCOAP(w.session, w.request) // Writes back to the client
	handler.ServeCOAP(w, r) // Writes back to the client
}

func (srv *Server) acquireReader(tcp net.Conn) *bufio.Reader {

M server_test.go => server_test.go +98 -76
@@ 17,9 17,9 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
	if isTCP {
		resp := &TcpMessage{
			MessageBase{
				//typ:       Acknowledgement, elided by COAP over TCP
				//typ:       Acknowledgement, not used by COAP over TCP
				code: Valid,
				//messageID: req.MessageID(), , elided by COAP over TCP
				//messageID: req.MessageID(), , not used by COAP over TCP
				payload: req.Payload(),
				token:   req.Token(),
			},


@@ 42,19 42,22 @@ func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {
	return resp
}

func EchoServer(w Session, req Message) {
	if req.IsConfirmable() {
		w.WriteMsg(CreateRespMessageByReq(w.IsTCP(), Valid, req), coapTimeout)
func EchoServer(w ResponseWriter, r *Request) {
	if r.Msg.IsConfirmable() {
		err := w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
		if err != nil {
			log.Printf("Cannot write echo %v", err)
		}
	}
}

func EchoServerBadID(w Session, req Message) {
	if req.IsConfirmable() {
		w.WriteMsg(CreateRespMessageByReq(w.IsTCP(), BadRequest, req), coapTimeout)
func EchoServerBadID(w ResponseWriter, r *Request) {
	if r.Msg.IsConfirmable() {
		w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), BadRequest, r.Msg))
	}
}

func RunLocalServerUDPWithHandler(lnet, laddr string, handler HandlerFunc) (*Server, string, chan error, error) {
func RunLocalServerUDPWithHandler(lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, handler HandlerFunc) (*Server, string, chan error, error) {
	network := strings.TrimSuffix(lnet, "-mcast")

	a, err := net.ResolveUDPAddr(network, laddr)


@@ 71,11 74,16 @@ func RunLocalServerUDPWithHandler(lnet, laddr string, handler HandlerFunc) (*Ser
		return nil, "", nil, err
	}
	server := &Server{Conn: pc, ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		NotifySessionNewFunc: func(s Session) {
			fmt.Printf("Session start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w Session, err error) {
			fmt.Printf("Session end %v: %v\n", w.RemoteAddr(), err)
		}, Handler: handler}
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		},
		NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		},
		Handler:              handler,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}

	waitLock := sync.Mutex{}
	waitLock.Lock()


@@ 95,22 103,25 @@ func RunLocalServerUDPWithHandler(lnet, laddr string, handler HandlerFunc) (*Ser
	return server, pc.LocalAddr().String(), fin, nil
}

func RunLocalUDPServer(net, laddr string) (*Server, string, chan error, error) {
	return RunLocalServerUDPWithHandler(net, laddr, nil)
func RunLocalUDPServer(net, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) (*Server, string, chan error, error) {
	return RunLocalServerUDPWithHandler(net, laddr, BlockWiseTransfer, BlockWiseTransferSzx, nil)
}

func RunLocalServerTCPWithHandler(laddr string, handler HandlerFunc) (*Server, string, chan error, error) {
func RunLocalServerTCPWithHandler(laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, handler HandlerFunc) (*Server, string, chan error, error) {
	l, err := net.Listen("tcp", laddr)
	if err != nil {
		return nil, "", nil, err
	}

	server := &Server{Listener: l, ReadTimeout: time.Second * 3600, WriteTimeout: time.Second * 3600,
		NotifySessionNewFunc: func(s Session) {
			fmt.Printf("Session start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w Session, err error) {
			fmt.Printf("Session end %v: %v\n", w.RemoteAddr(), err)
		}, Handler: handler}
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		}, Handler: handler,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}

	waitLock := sync.Mutex{}
	waitLock.Lock()


@@ 129,8 140,8 @@ func RunLocalServerTCPWithHandler(laddr string, handler HandlerFunc) (*Server, s
	return server, l.Addr().String(), fin, nil
}

func RunLocalTCPServer(laddr string) (*Server, string, chan error, error) {
	return RunLocalServerTCPWithHandler(laddr, nil)
func RunLocalTCPServer(laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) (*Server, string, chan error, error) {
	return RunLocalServerTCPWithHandler(laddr, BlockWiseTransfer, BlockWiseTransferSzx, nil)
}

func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan error, error) {


@@ 140,10 151,10 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 
	}

	server := &Server{Listener: l, ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		NotifySessionNewFunc: func(s Session) {
			fmt.Printf("Session start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w Session, err error) {
			fmt.Printf("Session end %v: %v\n", w.RemoteAddr(), err)
		NotifySessionNewFunc: func(s SessionNet) {
			fmt.Printf("SessionNet start %v\n", s.RemoteAddr())
		}, NotifySessionEndFunc: func(w SessionNet, err error) {
			fmt.Printf("SessionNet end %v: %v\n", w.RemoteAddr(), err)
		}}

	waitLock := sync.Mutex{}


@@ 166,20 177,25 @@ func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan 

type clientHandler func(t *testing.T, payload []byte, co *ClientConn)

func testServingTCPWithMsgWithObserver(t *testing.T, net string, payload []byte, ch clientHandler, observeFunc HandlerFunc) {
func testServingTCPWithMsgWithObserver(t *testing.T, net string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, payload []byte, ch clientHandler, observeFunc HandlerFunc) {
	HandleFunc("/test", EchoServer)
	defer HandleRemove("/test")

	var s *Server
	var addrstr string
	var err error
	c := &Client{Net: net, ObserverFunc: observeFunc}
	c := &Client{
		Net:                  net,
		ObserverFunc:         observeFunc,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}
	var fin chan error
	switch net {
	case "tcp", "tcp4", "tcp6":
		s, addrstr, fin, err = RunLocalTCPServer(":0")
		s, addrstr, fin, err = RunLocalTCPServer(":0", BlockWiseTransfer, BlockWiseTransferSzx)
	case "udp", "udp4", "udp6":
		s, addrstr, fin, err = RunLocalUDPServer(net, ":0")
		s, addrstr, fin, err = RunLocalUDPServer(net, ":0", BlockWiseTransfer, BlockWiseTransferSzx)
	case "tcp-tls", "tcp4-tls", "tcp6-tls":
		cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock)
		if err != nil {


@@ 226,7 242,7 @@ func simpleMsg(t *testing.T, payload []byte, co *ClientConn) {

	res := CreateRespMessageByReq(co.session.IsTCP(), Valid, req)

	m, err := co.Exchange(req, 1*time.Second)
	m, err := co.Exchange(req)
	if err != nil {
		t.Fatal("failed to exchange", err)
	}


@@ 237,80 253,80 @@ func simpleMsg(t *testing.T, payload []byte, co *ClientConn) {
}

func pingMsg(t *testing.T, payload []byte, co *ClientConn) {
	err := co.Ping(1 * time.Second)
	err := co.Ping(3600 * time.Second)
	if err != nil {
		t.Fatal("failed to exchange", err)
	}
}

func testServingTCPWithMsg(t *testing.T, net string, payload []byte, ch clientHandler) {
	testServingTCPWithMsgWithObserver(t, net, payload, ch, nil)
func testServingTCPWithMsg(t *testing.T, net string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, payload []byte, ch clientHandler) {
	testServingTCPWithMsgWithObserver(t, net, BlockWiseTransfer, BlockWiseTransferSzx, payload, ch, nil)
}

func TestServingUDP(t *testing.T) {
	testServingTCPWithMsg(t, "udp", make([]byte, 128), simpleMsg)
	testServingTCPWithMsg(t, "udp", false, BlockSzx16, make([]byte, 128), simpleMsg)
}

func TestServingUDPPing(t *testing.T) {

	testServingTCPWithMsg(t, "udp", nil, pingMsg)
	testServingTCPWithMsg(t, "udp", false, BlockSzx16, nil, pingMsg)
}

func TestServingTCPPing(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", nil, pingMsg)
	testServingTCPWithMsg(t, "tcp", false, BlockSzx16, nil, pingMsg)
}

func TestServingUDPBigMsg(t *testing.T) {
	testServingTCPWithMsg(t, "udp", make([]byte, 1000), simpleMsg)
	testServingTCPWithMsg(t, "udp", false, BlockSzx16, make([]byte, 1024), simpleMsg)
}

func TestServingTCP(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", make([]byte, 128), simpleMsg)
	testServingTCPWithMsg(t, "tcp", false, BlockSzx16, make([]byte, 128), simpleMsg)
}

func TestServingTCPBigMsg(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", make([]byte, 10*1024*1024), simpleMsg)
	testServingTCPWithMsg(t, "tcp", false, BlockSzx16, make([]byte, 10*1024*1024), simpleMsg)
}

func TestServingTLS(t *testing.T) {
	testServingTCPWithMsg(t, "tcp-tls", make([]byte, 128), simpleMsg)
	testServingTCPWithMsg(t, "tcp-tls", false, BlockSzx16, make([]byte, 128), simpleMsg)
}

func TestServingTLSBigMsg(t *testing.T) {
	testServingTCPWithMsg(t, "tcp-tls", make([]byte, 10*1024*1024), simpleMsg)
	testServingTCPWithMsg(t, "tcp-tls", false, BlockSzx16, make([]byte, 10*1024*1024), simpleMsg)
}

func ChallegingServer(w Session, req Message) {
	r := w.NewMessage(MessageParams{
func ChallegingServer(w ResponseWriter, r *Request) {
	req := r.SessionNet.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: 12345,
		Payload:   []byte("hello, world!"),
		Token:     []byte("abcd"),
	})
	_, err := w.Exchange(r, time.Second)
	_, err := r.SessionNet.Exchange(req)
	if err != nil {
		panic(err.Error())
	}

	w.WriteMsg(CreateRespMessageByReq(w.IsTCP(), Valid, req), coapTimeout)
	w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
}

func ChallegingServerTimeout(w Session, req Message) {
	r := w.NewMessage(MessageParams{
func ChallegingServerTimeout(w ResponseWriter, r *Request) {
	req := r.SessionNet.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: 12345,
		Payload:   []byte("hello, world!"),
		Token:     []byte("abcd"),
	})
	r.SetOption(ContentFormat, TextPlain)
	_, err := w.Exchange(r, time.Second)
	req.SetOption(ContentFormat, TextPlain)
	_, err := r.SessionNet.exchangeTimeout(req, time.Second, time.Second)
	if err == nil {
		panic("Error: expected timeout")
	}

	w.WriteMsg(CreateRespMessageByReq(w.IsTCP(), Valid, req), coapTimeout)
	w.Write(CreateRespMessageByReq(r.SessionNet.IsTCP(), Valid, r.Msg))
}

func simpleChallengingMsg(t *testing.T, payload []byte, co *ClientConn) {


@@ 333,7 349,7 @@ func simpleChallengingPathMsg(t *testing.T, payload []byte, co *ClientConn, path
	req0.SetOption(ContentFormat, TextPlain)
	req0.SetPathString(path)

	resp0, err := co.Exchange(req0, coapTimeout)
	resp0, err := co.Exchange(req0)
	if err != nil {
		t.Fatalf("unable to read msg from server: %v", err)
	}


@@ 345,25 361,25 @@ func simpleChallengingPathMsg(t *testing.T, payload []byte, co *ClientConn, path
func TestServingChallengingClient(t *testing.T) {
	HandleFunc("/challenging", ChallegingServer)
	defer HandleRemove("/challenging")
	testServingTCPWithMsg(t, "udp", make([]byte, 128), simpleChallengingMsg)
	testServingTCPWithMsg(t, "udp", false, BlockSzx16, make([]byte, 128), simpleChallengingMsg)
}

func TestServingChallengingClientTCP(t *testing.T) {
	HandleFunc("/challenging", ChallegingServer)
	defer HandleRemove("/challenging")
	testServingTCPWithMsg(t, "tcp", make([]byte, 128), simpleChallengingMsg)
	testServingTCPWithMsg(t, "tcp", false, BlockSzx16, make([]byte, 128), simpleChallengingMsg)
}

func TestServingChallengingClientTLS(t *testing.T) {
	HandleFunc("/challenging", ChallegingServer)
	defer HandleRemove("/challenging")
	testServingTCPWithMsg(t, "tcp-tls", make([]byte, 128), simpleChallengingMsg)
	testServingTCPWithMsg(t, "tcp-tls", false, BlockSzx16, make([]byte, 128), simpleChallengingMsg)
}

func TestServingChallengingTimeoutClient(t *testing.T) {
	HandleFunc("/challengingTimeout", ChallegingServerTimeout)
	defer HandleRemove("/challengingTimeout")
	testServingTCPWithMsgWithObserver(t, "udp", make([]byte, 128), simpleChallengingTimeoutMsg, func(Session, Message) {
	testServingTCPWithMsgWithObserver(t, "udp", false, BlockSzx16, make([]byte, 128), simpleChallengingTimeoutMsg, func(w ResponseWriter, r *Request) {
		//for timeout
		time.Sleep(2 * time.Second)
	})


@@ 372,7 388,7 @@ func TestServingChallengingTimeoutClient(t *testing.T) {
func TestServingChallengingTimeoutClientTCP(t *testing.T) {
	HandleFunc("/challengingTimeout", ChallegingServerTimeout)
	defer HandleRemove("/challengingTimeout")
	testServingTCPWithMsgWithObserver(t, "tcp", make([]byte, 128), simpleChallengingTimeoutMsg, func(Session, Message) {
	testServingTCPWithMsgWithObserver(t, "tcp", false, BlockSzx16, make([]byte, 128), simpleChallengingTimeoutMsg, func(w ResponseWriter, r *Request) {
		//for timeout
		time.Sleep(2 * time.Second)
	})


@@ 381,28 397,28 @@ func TestServingChallengingTimeoutClientTCP(t *testing.T) {
func TestServingChallengingTimeoutClientTLS(t *testing.T) {
	HandleFunc("/challengingTimeout", ChallegingServerTimeout)
	defer HandleRemove("/challengingTimeout")
	testServingTCPWithMsgWithObserver(t, "tcp-tls", make([]byte, 128), simpleChallengingTimeoutMsg, func(Session, Message) {
	testServingTCPWithMsgWithObserver(t, "tcp-tls", false, BlockSzx16, make([]byte, 128), simpleChallengingTimeoutMsg, func(w ResponseWriter, r *Request) {
		//for timeout
		time.Sleep(2 * time.Second)
	})
}

func testServingMCast(t *testing.T, lnet, laddr string) {
func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) {
	payload := []byte("mcast payload")
	addrMcast := laddr
	ansArrived := make(chan bool)

	HandleFunc("/test", func(s Session, m Message) {
		if bytes.Equal(m.Payload(), payload) {
			log.Printf("mcast %v -> %v", s.RemoteAddr(), s.LocalAddr())
	HandleFunc("/test", func(w ResponseWriter, r *Request) {
		if bytes.Equal(r.Msg.Payload(), payload) {
			log.Printf("mcast %v -> %v", r.SessionNet.RemoteAddr(), r.SessionNet.LocalAddr())
			ansArrived <- true
		} else {
			t.Fatalf("unknown payload %v arrived from %v", m.Payload(), s.RemoteAddr())
			t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.SessionNet.RemoteAddr())
		}
	})
	defer HandleRemove("/test")

	s, _, fin, err := RunLocalUDPServer(lnet, addrMcast)
	s, _, fin, err := RunLocalUDPServer(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx)
	if err != nil {
		t.Fatalf("unable to run test server: %v", err)
	}


@@ 411,7 427,13 @@ func testServingMCast(t *testing.T, lnet, laddr string) {
		<-fin
	}()

	co, err := Dial(strings.TrimSuffix(lnet, "-mcast"), addrMcast)
	c := Client{
		Net:                  strings.TrimSuffix(lnet, "-mcast"),
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}

	co, err := c.Dial(addrMcast)
	if err != nil {
		t.Fatalf("unable to dialing: %v", err)
	}


@@ 427,17 449,17 @@ func testServingMCast(t *testing.T, lnet, laddr string) {
	req.SetOption(ContentFormat, TextPlain)
	req.SetPathString("/test")

	co.WriteMsg(req, time.Second)
	co.Write(req)

	<-ansArrived
}

func TestServingIPv4MCast(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111")
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", false, BlockSzx16)
}

func TestServingIPv6MCast(t *testing.T) {
	testServingMCast(t, "udp6-mcast", "[ff03::158]:11111")
	testServingMCast(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16)
}

func BenchmarkServe(b *testing.B) {


@@ 445,7 467,7 @@ func BenchmarkServe(b *testing.B) {
	HandleFunc("/test", EchoServer)
	defer HandleRemove("/test")

	s, addrstr, fin, err := RunLocalUDPServer("udp", ":0")
	s, addrstr, fin, err := RunLocalUDPServer("udp", ":0", false, BlockSzx16)
	if err != nil {
		b.Fatalf("unable to run test server: %v", err)
	}


@@ 476,7 498,7 @@ func BenchmarkServe(b *testing.B) {
		token := make([]byte, 8)
		binary.LittleEndian.PutUint32(token, i)
		abc.SetToken(token)
		_, err = co.Exchange(&abc, 5*time.Second)
		_, err = co.Exchange(&abc)
		if err != nil {
			b.Fatalf("unable to read msg from server: %v", err)
		}


@@ 488,7 510,7 @@ func BenchmarkServeTCP(b *testing.B) {
	HandleFunc("/test", EchoServer)
	defer HandleRemove("/test")

	s, addrstr, fin, err := RunLocalTCPServer(":0")
	s, addrstr, fin, err := RunLocalTCPServer(":0", false, BlockSzx16)
	if err != nil {
		b.Fatalf("unable to run test server: %v", err)
	}


@@ 519,7 541,7 @@ func BenchmarkServeTCP(b *testing.B) {
		token := make([]byte, 8)
		binary.LittleEndian.PutUint32(token, i)
		abc.SetToken(token)
		_, err = co.Exchange(&abc, 1*time.Second)
		_, err = co.Exchange(&abc)
		if err != nil {
			b.Fatalf("unable to read msg from server: %v", err)
		}


@@ 531,7 553,7 @@ func benchmarkServeTCPStreamWithMsg(b *testing.B, req *TcpMessage) {
	HandleFunc("/test", EchoServer)
	defer HandleRemove("/test")

	s, addrstr, fin, err := RunLocalTCPServer(":0")
	s, addrstr, fin, err := RunLocalTCPServer(":0", false, BlockSzx16)
	if err != nil {
		b.Fatalf("unable to run test server: %v", err)
	}


@@ 556,7 578,7 @@ func benchmarkServeTCPStreamWithMsg(b *testing.B, req *TcpMessage) {
			token := make([]byte, 8)
			binary.LittleEndian.PutUint32(token, t)
			abc.SetToken(token)
			resp, err := co.Exchange(&abc, 5*time.Second)
			resp, err := co.Exchange(&abc)
			if err != nil {
				b.Fatalf("unable to read msg from server: %v", err)
			}

M servermux.go => servermux.go +7 -7
@@ 85,12 85,12 @@ func (mux *ServeMux) DefaultHandle(handler Handler) {
}

// HandleFunc adds a handler function to the ServeMux for pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(Session, Message)) {
func (mux *ServeMux) HandleFunc(pattern string, handler func(w ResponseWriter, r *Request)) {
	mux.Handle(pattern, HandlerFunc(handler))
}

// DefaultHandleFunc set a default handler function to the ServeMux.
func (mux *ServeMux) DefaultHandleFunc(handler func(Session, Message)) {
func (mux *ServeMux) DefaultHandleFunc(handler func(w ResponseWriter, r *Request)) {
	mux.DefaultHandle(HandlerFunc(handler))
}



@@ 109,15 109,15 @@ func (mux *ServeMux) HandleRemove(pattern string) {
// is used the correct thing for DS queries is done: a possible parent
// is sought.
// If no handler is found a standard NotFound message is returned
func (mux *ServeMux) ServeCOAP(w Session, request Message) {
	h, _ := mux.match(request.PathString())
func (mux *ServeMux) ServeCOAP(w ResponseWriter, r *Request) {
	h, _ := mux.match(r.Msg.PathString())
	if h == nil {
		h = mux.defaultHandler
		if h == nil {
			h = failedHandler()
		}
	}
	h.ServeCOAP(w, request)
	h.ServeCOAP(w, r)
}

// Handle registers the handler with the given pattern


@@ 127,7 127,7 @@ func Handle(pattern string, handler Handler) { DefaultServeMux.Handle(pattern, h

// HandleFunc registers the handler function with the given pattern
// in the DefaultServeMux.
func HandleFunc(pattern string, handler func(Session, Message)) {
func HandleFunc(pattern string, handler func(w ResponseWriter, r *Request)) {
	DefaultServeMux.HandleFunc(pattern, handler)
}



@@ 139,6 139,6 @@ func HandleRemove(pattern string) { DefaultServeMux.HandleRemove(pattern) }
func DefaultHandle(handler Handler) { DefaultServeMux.DefaultHandle(handler) }

// DefaultHandleFunc set the default handler in the DefaultServeMux.
func DefaultHandleFunc(handler func(Session, Message)) {
func DefaultHandleFunc(handler func(w ResponseWriter, r *Request)) {
	DefaultServeMux.DefaultHandleFunc(handler)
}

M session.go => session.go +364 -118
@@ 1,7 1,6 @@
package coap

import (
	"fmt"
	"log"
	"net"
	"sync"


@@ 9,15 8,15 @@ import (
	"time"
)

// A Session interface is used by an COAP handler to
// A SessionNet interface is used by an COAP handler to
// server data in session.
type Session interface {
type SessionNet interface {
	// LocalAddr returns the net.Addr of the server
	LocalAddr() net.Addr
	// RemoteAddr returns the net.Addr of the client that sent the current request.
	RemoteAddr() net.Addr
	// WriteMsg writes a reply back to the client.
	WriteMsg(resp Message, timeout time.Duration) error
	Write(resp Message) error
	// Close closes the connection.
	Close() error
	// Return type of network


@@ 26,83 25,220 @@ type Session interface {
	NewMessage(params MessageParams) Message
	// Exchange writes message and wait for response - paired by token and msgid
	// it is safe to use in goroutines
	Exchange(req Message, timeout time.Duration) (Message, error)
	Exchange(req Message) (Message, error)
	// Send ping to peer and wait for pong
	Ping(timeout time.Duration) error
	// SetReadDeadline set read deadline for timeout for Exchange
	SetReadDeadline(timeout time.Duration)
	// SetWriteDeadline set write deadline for timeout for Exchange and Write
	SetWriteDeadline(timeout time.Duration)
	// ReadDeadline get read deadline
	ReadDeadline() time.Duration
	// WriteDeadline get read writeline
	WriteDeadline() time.Duration

	// handlePairMsg Message was handled by pair
	handlePairMsg(req Message) bool
	handlePairMsg(w ResponseWriter, r *Request) bool

	// handleSignals Message below to signals
	handleSignals(req Message) bool
	handleSignals(w ResponseWriter, r *Request) bool

	// sendPong create pong by m and send it
	sendPong(m Message) error
	sendPong(w ResponseWriter, r *Request) error

	// close session with error
	closeWithError(err error) error

	exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error)

	sessionHandler() *sessionHandler

	// BlockWiseTransferEnabled
	blockWiseEnabled() bool
	// BlockWiseTransferSzx
	blockWiseSzx() BlockSzx
	// MaxPayloadSize
	blockWiseMaxPayloadSize(peer BlockSzx) int

	blockWiseIsValid(szx BlockSzx) bool
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (Session, error) {
	s := &sessionUDP{sessionBase: sessionBase{srv: srv, mapPairs: make(map[[8]byte](*sessionResp)), connection: connection}, sessionUDPData: sessionUDPData, mapMsgIdPairs: make(map[uint16](*sessionResp))}
func newSessionUDP(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}

	if BlockWiseTransfer && BlockWiseTransferSzx == BlockSzxBERT {
		return nil, ErrInvalidBlockSzx
	}

	s := &sessionUDP{
		sessionBase: sessionBase{
			srv:                  srv,
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: BlockWiseTransferSzx,
		},
		sessionUDPData: sessionUDPData,
		mapPairs:       make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
	}
	return s, nil
}

// NewSessionTCP create new session for TCP connection
func newSessionTCP(connection Conn, srv *Server) (Session, error) {
	s := &sessionTCP{sessionBase: sessionBase{srv: srv, mapPairs: make(map[[8]byte](*sessionResp)), connection: connection}}
	if err := s.sendCSM(); err != nil {
		return nil, err
// newSessionTCP create new session for TCP connection
func newSessionTCP(connection Conn, srv *Server) (SessionNet, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockSzxBERT
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}
	s := &sessionTCP{
		mapPairs:           make(map[[MaxTokenSize]byte](*sessionResp)),
		peerMaxMessageSize: uint32(srv.MaxMessageSize),
		sessionBase: sessionBase{
			srv:                  srv,
			connection:           connection,
			readDeadline:         30 * time.Second,
			writeDeadline:        30 * time.Second,
			handler:              &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: BlockWiseTransferSzx,
		},
	}
	/*
		if err := s.sendCSM(); err != nil {
			return nil, err
		}
	*/

	return s, nil
}

type sessionResp struct {
	ch chan Message // channel must have size 1 for non-blocking write to channel
	ch chan *Request // channel must have size 1 for non-blocking write to channel
}

type sessionBase struct {
	srv        *Server
	connection Conn
	srv           *Server
	connection    Conn
	readDeadline  time.Duration
	writeDeadline time.Duration
	handler       *sessionHandler

	mapPairs     map[[MaxTokenSize]byte]*sessionResp //storage of channel Message
	mapPairsLock sync.Mutex                          //to sync add remove token
	blockWiseTransfer    bool
	blockWiseTransferSzx BlockSzx
}

type sessionUDP struct {
	sessionBase
	sessionUDPData    *SessionUDPData         // oob data to get egress interface right
	mapMsgIdPairs     map[uint16]*sessionResp //storage of channel Message
	mapMsgIdPairsLock sync.Mutex              //to sync add remove token
	sessionUDPData *SessionUDPData                                // oob data to get egress interface right
	mapPairs       map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock   sync.Mutex                                     //to sync add remove token
}

type sessionTCP struct {
	sessionBase
	peerMaxMessageSize    uint32 // SCM from peer set peerMaxMessageSize
	peerBlockWiseTransfer uint32 // SCM from peer inform us that it supports blockwise (+ BERT when peerMaxMessageSize > 1152 )

	mapPairs     map[[MaxTokenSize]byte]*sessionResp //storage of channel Message
	mapPairsLock sync.Mutex                          //to sync add remove token

	peerBlockWiseTransfer uint32
	peerMaxMessageSize    uint32
}

// LocalAddr implements the Session.LocalAddr method.
// LocalAddr implements the SessionNet.LocalAddr method.
func (s *sessionUDP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// LocalAddr implements the Session.LocalAddr method.
// LocalAddr implements the SessionNet.LocalAddr method.
func (s *sessionTCP) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the Session.RemoteAddr method.
// RemoteAddr implements the SessionNet.RemoteAddr method.
func (s *sessionUDP) RemoteAddr() net.Addr {
	return s.sessionUDPData.RemoteAddr()
}

// RemoteAddr implements the Session.RemoteAddr method.
// RemoteAddr implements the SessionNet.RemoteAddr method.
func (s *sessionTCP) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

func (s *sessionBase) SetReadDeadline(timeout time.Duration) {
	s.readDeadline = timeout
}

func (s *sessionBase) SetWriteDeadline(timeout time.Duration) {
	s.writeDeadline = timeout
}

func (s *sessionBase) ReadDeadline() time.Duration {
	return s.readDeadline
}

// WriteDeadline get read writeline
func (s *sessionBase) WriteDeadline() time.Duration {
	return s.writeDeadline
}

// BlockWiseTransferEnabled
func (s *sessionUDP) blockWiseEnabled() bool {
	return s.blockWiseTransfer
}

func (s *sessionTCP) blockWiseEnabled() bool {
	return s.blockWiseTransfer /*&& atomic.LoadUint32(&s.peerBlockWiseTransfer) != 0*/
}

func (s *sessionBase) blockWiseSzx() BlockSzx {
	return s.blockWiseTransferSzx
}

func (s *sessionBase) blockWiseMaxPayloadSize(peer BlockSzx) int {
	if peer < s.blockWiseTransferSzx {
		return SZXVal[peer]
	}
	return SZXVal[s.blockWiseTransferSzx]
}

func (s *sessionTCP) blockWiseMaxPayloadSize(peer BlockSzx) int {
	if s.blockWiseTransferSzx == BlockSzxBERT && peer == BlockSzxBERT {
		m := atomic.LoadUint32(&s.peerMaxMessageSize)
		if m == 0 {
			m = maxMessageSize
		}
		return int(m - (m % 1024))
	}
	return s.sessionBase.blockWiseMaxPayloadSize(peer)
}

func (s *sessionUDP) blockWiseIsValid(szx BlockSzx) bool {
	return szx <= BlockSzx1024
}

func (s *sessionTCP) blockWiseIsValid(szx BlockSzx) bool {
	return true
}

func (s *sessionBase) sessionHandler() *sessionHandler {
	return s.handler
}

func (s *sessionUDP) closeWithError(err error) error {
	s.srv.sessionUDPMapLock.Lock()
	delete(s.srv.sessionUDPMap, s.sessionUDPData.Key())


@@ 123,9 259,9 @@ func (s *sessionUDP) Ping(timeout time.Duration) error {
	req := s.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      Empty,
		MessageID: GenerateMessageId(),
		MessageID: GenerateMessageID(),
	})
	resp, err := s.Exchange(req, timeout)
	resp, err := s.exchangeTimeout(req, timeout, timeout)
	if err != nil {
		return err
	}


@@ 145,7 281,7 @@ func (s *sessionTCP) Ping(timeout time.Duration) error {
		Code:  Ping,
		Token: []byte(token),
	})
	resp, err := s.Exchange(req, timeout)
	resp, err := s.exchangeTimeout(req, timeout, timeout)
	if err != nil {
		return err
	}


@@ 155,7 291,7 @@ func (s *sessionTCP) Ping(timeout time.Duration) error {
	return ErrInvalidResponse
}

// Close implements the Session.Close method
// Close implements the SessionNet.Close method
func (s *sessionUDP) Close() error {
	return s.closeWithError(nil)
}


@@ 173,7 309,7 @@ func (s *sessionTCP) closeWithError(err error) error {
	return err
}

// Close implements the Session.Close method
// Close implements the SessionNet.Close method
func (s *sessionTCP) Close() error {
	return s.closeWithError(nil)
}


@@ 188,111 324,142 @@ func (s *sessionTCP) NewMessage(p MessageParams) Message {
	return NewTcpMessage(p)
}

// Close implements the Session.Close method
// Close implements the SessionNet.Close method
func (s *sessionUDP) IsTCP() bool {
	return false
}

// Close implements the Session.Close method
// Close implements the SessionNet.Close method
func (s *sessionTCP) IsTCP() bool {
	return true
}

func (s *sessionBase) exchangeFunc(req Message, timeout time.Duration, pairChan *sessionResp, write func(msg Message, timeout time.Duration) error) (Message, error) {
	var pairToken [MaxTokenSize]byte
	if req.Token() != nil {
		copy(pairToken[:], req.Token())
		s.mapPairsLock.Lock()
		for s.mapPairs[pairToken] != nil {
			return nil, ErrTokenAlreadyExist
		}

		s.mapPairs[pairToken] = pairChan
		s.mapPairsLock.Unlock()
	}

	defer func() {
		if req.Token() != nil {
			s.mapPairsLock.Lock()
			s.mapPairs[pairToken] = nil
			s.mapPairsLock.Unlock()
		}
	}()
func (s *sessionBase) exchangeFunc(req Message, writeTimeout, readTimeout time.Duration, pairChan *sessionResp, write func(msg Message, timeout time.Duration) error) (Message, error) {

	err := write(req, timeout)
	err := write(req, writeTimeout)
	if err != nil {
		return nil, err
	}

	select {
	case resp := <-pairChan.ch:
		return resp, nil
	case <-time.After(timeout):
		return resp.Msg, nil
	case <-time.After(readTimeout):
		return nil, ErrTimeout
	}
}

func (s *sessionTCP) Exchange(req Message, timeout time.Duration) (Message, error) {
// Write implements the SessionNet.Write method.
func (s *sessionTCP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}

// Write implements the SessionNet.Write method.
func (s *sessionUDP) Exchange(m Message) (Message, error) {
	return s.exchangeTimeout(m, s.writeDeadline, s.readDeadline)
}

func (s *sessionTCP) exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error) {
	if req.Token() == nil {
		return nil, ErrTokenNotSet
		return nil, ErrTokenNotExist
	}

	pairChan := &sessionResp{make(chan *Request, 1)}

	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], req.Token())
	s.mapPairsLock.Lock()
	if s.mapPairs[pairToken] != nil {
		return nil, ErrTokenAlreadyExist
	}
	return s.exchangeFunc(req, timeout, &sessionResp{make(chan Message, 1)}, s.WriteMsg)

	s.mapPairs[pairToken] = pairChan
	s.mapPairsLock.Unlock()

	defer func() {
		if req.Token() != nil {
			s.mapPairsLock.Lock()
			delete(s.mapPairs, pairToken)
			s.mapPairsLock.Unlock()
		}
	}()

	return s.exchangeFunc(req, writeDeadline, readDeadline, pairChan, s.writeTimeout)
}

func (s *sessionUDP) Exchange(req Message, timeout time.Duration) (Message, error) {
	pairChan := &sessionResp{make(chan Message, 1)}
	s.mapMsgIdPairsLock.Lock()
	for s.mapMsgIdPairs[req.MessageID()] != nil {
func (s *sessionUDP) exchangeTimeout(req Message, writeDeadline, readDeadline time.Duration) (Message, error) {
	//register msgid to token
	pairChan := &sessionResp{make(chan *Request, 1)}
	var pairToken [MaxTokenSize]byte
	copy(pairToken[:], req.Token())
	s.mapPairsLock.Lock()
	if s.mapPairs[pairToken] == nil {
		s.mapPairs[pairToken] = make(map[uint16]*sessionResp)
	}
	if s.mapPairs[pairToken][req.MessageID()] != nil {
		s.mapPairsLock.Unlock()
		return nil, ErrTokenAlreadyExist
	}
	s.mapMsgIdPairs[req.MessageID()] = pairChan
	s.mapMsgIdPairsLock.Unlock()
	s.mapPairs[pairToken][req.MessageID()] = pairChan
	s.mapPairsLock.Unlock()

	defer func() {
		s.mapMsgIdPairsLock.Lock()
		s.mapMsgIdPairs[req.MessageID()] = nil
		s.mapMsgIdPairsLock.Unlock()
		s.mapPairsLock.Lock()
		delete(s.mapPairs[pairToken], req.MessageID())
		if len(s.mapPairs[pairToken]) == 0 {
			delete(s.mapPairs, pairToken)
		}
		s.mapPairsLock.Unlock()
	}()
	return s.exchangeFunc(req, timeout, pairChan, s.WriteMsg)

	return s.exchangeFunc(req, writeDeadline, readDeadline, pairChan, s.writeTimeout)
}

// Write implements the SessionNet.Write method.
func (s *sessionTCP) Write(m Message) error {
	return s.writeTimeout(m, s.writeDeadline)
}

func (s *sessionUDP) Write(m Message) error {
	return s.writeTimeout(m, s.writeDeadline)
}

// WriteMsg implements the Session.WriteMsg method.
func (s *sessionTCP) WriteMsg(m Message, timeout time.Duration) error {
func (s *sessionTCP) writeTimeout(m Message, timeout time.Duration) error {
	req, err := m.MarshalBinary()
	if err != nil {
		return err
	}
	peerMaxMessageSize := int(atomic.LoadUint32(&s.peerMaxMessageSize))
	if peerMaxMessageSize != 0 && len(req) > peerMaxMessageSize {
		//TODO blockwise transfer + BERT to device
		//TODO blockWise transfer + BERT to device
		return ErrMsgTooLarge
	}
	return s.connection.write(&writeReqTCP{writeReqBase{req: req, respChan: make(chan error, 1)}}, timeout)
}

// WriteMsg implements the Session.WriteMsg method.
func (s *sessionUDP) WriteMsg(m Message, timeout time.Duration) error {
// WriteMsg implements the SessionNet.WriteMsg method.
func (s *sessionUDP) writeTimeout(m Message, timeout time.Duration) error {
	req, err := m.MarshalBinary()
	if err != nil {
		return err
	}

	//TODO blockwise transfer to device
	//TODO blockWise transfer to device
	if len(req) > int(s.srv.MaxMessageSize) {
		return ErrMsgTooLarge
	}
	return s.connection.write(&writeReqUDP{writeReqBase{req: req, respChan: make(chan error, 1)}, s.sessionUDPData}, timeout)
}

func (s *sessionBase) handleBasePairMsg(m Message) bool {
func (s *sessionTCP) handlePairMsg(w ResponseWriter, r *Request) bool {
	var token [MaxTokenSize]byte
	copy(token[:], m.Token())
	copy(token[:], r.Msg.Token())
	s.mapPairsLock.Lock()
	pair := s.mapPairs[token]
	s.mapPairsLock.Unlock()
	if pair != nil {
		select {
		case pair.ch <- m:
		case pair.ch <- r:
		default:
			log.Fatal("Exactly one message can be send to pair. This is second message.")
		}


@@ 302,20 469,17 @@ func (s *sessionBase) handleBasePairMsg(m Message) bool {
	return false
}

func (s *sessionTCP) handlePairMsg(m Message) bool {
	return s.handleBasePairMsg(m)
}
func (s *sessionUDP) handlePairMsg(w ResponseWriter, r *Request) bool {
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	//validate token

func (s *sessionUDP) handlePairMsg(m Message) bool {
	if s.handleBasePairMsg(m) {
		return true
	}
	s.mapMsgIdPairsLock.Lock()
	pair := s.mapMsgIdPairs[m.MessageID()]
	s.mapMsgIdPairsLock.Unlock()
	s.mapPairsLock.Lock()
	pair := s.mapPairs[token][r.Msg.MessageID()]
	s.mapPairsLock.Unlock()
	if pair != nil {
		select {
		case pair.ch <- m:
		case pair.ch <- r:
		default:
			log.Fatal("Exactly one message can be send to pair. This is second message.")
		}


@@ 335,12 499,13 @@ func (s *sessionTCP) sendCSM() error {
		Token: []byte(token),
	})
	req.AddOption(MaxMessageSize, uint32(s.srv.MaxMessageSize))
	//TODO blockwise
	//req.AddOption(coap.BlockWiseTransfer)
	return s.WriteMsg(req, 1*time.Second)
	if s.blockWiseEnabled() {
		req.AddOption(BlockWiseTransfer, nil)
	}
	return s.Write(req)
}

func (s *sessionTCP) setPeerMaxMeesageSize(val uint32) {
func (s *sessionTCP) setPeerMaxMessageSize(val uint32) {
	atomic.StoreUint32(&s.peerMaxMessageSize, val)
}



@@ 352,48 517,68 @@ func (s *sessionTCP) setPeerBlockWiseTransfer(val bool) {
	atomic.StoreUint32(&s.peerBlockWiseTransfer, v)
}

func (s *sessionUDP) sendPong(m Message) error {
	fmt.Printf("sendPong\n")
	req := s.NewMessage(MessageParams{
func (s *sessionUDP) sendPong(w ResponseWriter, r *Request) error {
	resp := r.SessionNet.NewMessage(MessageParams{
		Type:      Reset,
		Code:      GET,
		MessageID: m.MessageID(),
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return s.WriteMsg(req, 1*time.Second)
	return w.Write(resp)
}

func (s *sessionTCP) sendPong(m Message) error {
func (s *sessionTCP) sendPong(w ResponseWriter, r *Request) error {
	req := s.NewMessage(MessageParams{
		Type:  NonConfirmable,
		Code:  Pong,
		Token: m.Token(),
		Token: r.Msg.Token(),
	})
	return s.WriteMsg(req, 1*time.Second)
	return w.Write(req)
}

func (s *sessionTCP) handleSignals(m Message) bool {
	switch m.Code() {
func (s *sessionTCP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	case CSM:
		if size, ok := m.Option(MaxMessageSize).(uint32); ok {
			s.setPeerMaxMeesageSize(size)
		maxmsgsize := uint32(maxMessageSize)
		if size, ok := r.Msg.Option(MaxMessageSize).(uint32); ok {
			s.setPeerMaxMessageSize(size)
			maxmsgsize = size
		}
		if m.Option(BlockWiseTransfer) != nil {
		if r.Msg.Option(BlockWiseTransfer) != nil {
			s.setPeerBlockWiseTransfer(true)
			switch s.blockWiseSzx() {
			case BlockSzxBERT:
				if SZXVal[BlockSzx1024] < int(maxmsgsize) {
					s.sessionBase.blockWiseTransferSzx = BlockSzxBERT
				}
				for i := BlockSzx512; i > BlockSzx16; i-- {
					if SZXVal[i] < int(maxmsgsize) {
						s.sessionBase.blockWiseTransferSzx = i
					}
				}
				s.sessionBase.blockWiseTransferSzx = BlockSzx16
			default:
				for i := s.blockWiseSzx(); i > BlockSzx16; i-- {
					if SZXVal[i] < int(maxmsgsize) {
						s.sessionBase.blockWiseTransferSzx = i
					}
				}
				s.sessionBase.blockWiseTransferSzx = BlockSzx16
			}
		}
		return true
	case Ping:
		if m.Option(Custody) != nil {
		if r.Msg.Option(Custody) != nil {
			//TODO
		}
		s.sendPong(m)
		s.sendPong(w, r)
		return true
	case Release:
		if _, ok := m.Option(AlternativeAddress).(string); ok {
		if _, ok := r.Msg.Option(AlternativeAddress).(string); ok {
			//TODO
		}
		return true
	case Abort:
		if _, ok := m.Option(BadCSMOption).(uint32); ok {
		if _, ok := r.Msg.Option(BadCSMOption).(uint32); ok {
			//TODO
		}
		return true


@@ 401,14 586,75 @@ func (s *sessionTCP) handleSignals(m Message) bool {
	return false
}

func (s *sessionUDP) handleSignals(req Message) bool {
	switch req.Code() {
func (s *sessionUDP) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	// handle of udp ping
	case Empty:
		if req.Type() == Confirmable && req.AllOptions().Len() == 0 && (req.Payload() == nil || len(req.Payload()) == 0) {
			s.sendPong(req)
		if r.Msg.Type() == Confirmable && r.Msg.AllOptions().Len() == 0 && (r.Msg.Payload() == nil || len(r.Msg.Payload()) == 0) {
			s.sendPong(w, r)
			return true
		}
	}
	return false
}

func handleSignalMsg(w ResponseWriter, r *Request, next HandlerFunc) {
	if !r.SessionNet.handleSignals(w, r) {
		next(w, r)
	}
}

func handlePairMsg(w ResponseWriter, r *Request, next HandlerFunc) {
	if !r.SessionNet.handlePairMsg(w, r) {
		next(w, r)
	}
}

func handleBySessionHandler(w ResponseWriter, r *Request, next HandlerFunc) {
	r.SessionNet.sessionHandler().handle(w, r, next)
}

type sessionHandler struct {
	tokenHandlers     map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc)
	tokenHandlersLock sync.Mutex
}

func (s *sessionHandler) handle(w ResponseWriter, r *Request, next HandlerFunc) {
	//validate token
	var token [MaxTokenSize]byte
	copy(token[:], r.Msg.Token())
	s.tokenHandlersLock.Lock()
	h := s.tokenHandlers[token]
	s.tokenHandlersLock.Unlock()
	if h != nil {
		h(w, r, next)
		return
	}
	if next != nil {
		next(w, r)
	}
}

func (s *sessionHandler) add(t []byte, h func(w ResponseWriter, r *Request, next HandlerFunc)) error {
	var token [MaxTokenSize]byte
	copy(token[:], t)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[token] != nil {
		return ErrTokenAlreadyExist
	}
	s.tokenHandlers[token] = h
	return nil
}

func (s *sessionHandler) remove(t []byte) error {
	var token [MaxTokenSize]byte
	copy(token[:], t)
	s.tokenHandlersLock.Lock()
	defer s.tokenHandlersLock.Unlock()
	if s.tokenHandlers[token] == nil {
		return ErrTokenNotExist
	}
	delete(s.tokenHandlers, token)
	return nil
}

M utils.go => utils.go +2 -2
@@ 22,7 22,7 @@ func GenerateToken(n int) ([]byte, error) {

var msgIdIter = uint32(0)

// GenerateMessageId generates a message id for UDP-coap
func GenerateMessageId() uint16 {
// GenerateMessageID generates a message id for UDP-coap
func GenerateMessageID() uint16 {
	return uint16(atomic.AddUint32(&msgIdIter, 1) % 0xffff)
}