~fnux/yggdrasil-go-coap

bfbd2fd990d67e4fb51b92560c620de7615b247c — Jozef Kralik 2 years ago dfd0cb9
add method OBSERVE to client
10 files changed, 543 insertions(+), 122 deletions(-)

M blockwise.go
M blockwise_test.go
M client.go
M client_test.go
M error.go
A getresponsewriter.go
M responsewriter.go
M server.go
R session.go => sessionnet.go
M utils.go
M blockwise.go => blockwise.go +105 -36
@@ 1,13 1,14 @@
package coap

import (
	"fmt"
	"log"
	"time"
)

const (
	maxBlockNumber = uint(1048575)
	debug          = false
	blockWiseDebug = false
)

type BlockSzx uint8


@@ 158,7 159,7 @@ func (s *blockWiseSender) createReq(b *blockWiseSession) (Message, error) {
func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, error) {
	var resp Message
	var err error
	if debug {
	if blockWiseDebug {
		log.Printf("sendPayload %p req=%v\n", b, req)
	}
	if s.peerDrive {


@@ 169,7 170,7 @@ func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, e
	if err != nil {
		return nil, err
	}
	if debug {
	if blockWiseDebug {
		log.Printf("sendPayload %p resp=%v\n", b, resp)
	}
	return resp, nil


@@ 188,7 189,7 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
				}
				if num == 0 {
					resp.RemoveOption(s.sizeType())
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code(), Confirmable, Changed)
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code(), Changed)
				}
			}
		}


@@ 213,6 214,8 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
		if !b.blockWiseIsValid(szx) {
			return nil, ErrInvalidBlockSzx
		}

		s.currentSzx = szx
		if s.peerDrive {
			s.currentNum = num
			req.SetMessageID(resp.MessageID())


@@ 228,7 231,7 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
		}
		req.SetPayload(s.origin.Payload()[startOffset:endOffset])
		//must be unique for evey msg via UDP
		if debug {
		if blockWiseDebug {
			log.Printf("sendPayload szx=%v num=%v more=%v\n", s.currentSzx, s.currentNum, s.currentMore)
		}
		block, err := MarshalBlockOption(s.currentSzx, s.currentNum, s.currentMore)


@@ 276,21 279,26 @@ type blockWiseSession struct {

func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
	switch msg.Code() {
	case GET:
		return b.receivePayload(false, msg, nil, Block2, msg.Code(), Confirmable, Content)
	case CSM, Ping, Pong, Release, Abort, Empty:
		return b.SessionNet.Exchange(msg)
	case GET, DELETE:
		return b.receivePayload(false, msg, nil, Block2, msg.Code(), Content)
	case POST, PUT:
		return b.sendPayload(false, Block1, b.SessionNet.blockWiseSzx(), Continue, msg)
	default:
		return b.sendPayload(true, Block2, b.SessionNet.blockWiseSzx(), Continue, msg)
	}
	return b.SessionNet.Exchange(msg)

}

func (b *blockWiseSession) Write(msg Message) error {
	switch msg.Code() {
	case GET, POST, PUT:
	case CSM, Ping, Pong, Release, Abort, Empty, GET:
		return b.SessionNet.Write(msg)
	default:
		_, err := b.Exchange(msg)
		return err
	}
	return b.SessionNet.Write(msg)
}

func calcNextNum(num uint, szx BlockSzx, payloadSize int) uint {


@@ 374,7 382,7 @@ func (r *blockWiseReceiver) createReq(b *blockWiseSession, resp Message) (Messag
	return req, nil
}

func (b *blockWiseSession) receivePayloadInit(peerDrive bool, origin Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (r *blockWiseReceiver, res Message, err error) {
func newReceiver(b *blockWiseSession, peerDrive bool, origin Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (r *blockWiseReceiver, res Message, err error) {
	r = &blockWiseReceiver{
		peerDrive:    peerDrive,
		code:         code,


@@ 446,7 454,7 @@ func (b *blockWiseSession) receivePayloadInit(peerDrive bool, origin Message, re
}

func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message, error) {
	if debug {
	if blockWiseDebug {
		log.Printf("receivePayload %p req=%v\n", b, req)
	}
	var resp Message


@@ 457,7 465,7 @@ func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message,
		resp, err = b.SessionNet.Exchange(req)
	}

	if debug {
	if blockWiseDebug {
		log.Printf("receivePayload %p resp=%v\n", b, resp)
	}



@@ 494,7 502,13 @@ func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp M
			if r.peerDrive {
				r.nextNum = num
			} else {
				r.nextNum = calcNextNum(num, szx, len(resp.Payload()))
				if szx > b.blockWiseSzx() {
					num = 0
					szx = b.blockWiseSzx()
					r.nextNum = calcNextNum(num, szx, len(r.payload))
				} else {
					r.nextNum = calcNextNum(num, szx, len(resp.Payload()))
				}
			}
		}



@@ 516,7 530,7 @@ func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp M
		} else {
			req.SetMessageID(GenerateMessageID())
		}
		if debug {
		if blockWiseDebug {
			log.Printf("receivePayload szx=%v num=%v more=%v\n", szx, r.nextNum, more)
		}
		block, err := MarshalBlockOption(szx, r.nextNum, more)


@@ 555,8 569,8 @@ func (r *blockWiseReceiver) sendError(b *blockWiseSession, code COAPCode, resp M
	b.sendErrorMsg(code, typ, token, MessageID)
}

func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode, typ COAPType, expectedCode COAPCode) (Message, error) {
	r, resp, err := b.receivePayloadInit(peerDrive, msg, resp, blockType, code, expectedCode)
func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (Message, error) {
	r, resp, err := newReceiver(b, peerDrive, msg, resp, blockType, code, expectedCode)
	if err != nil {
		r.sendError(b, BadRequest, resp)
		return nil, err


@@ 597,35 611,56 @@ func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Mess
	}
}

func invalidBlockWise(w ResponseWriter, r *Request, err error) {
	resp := r.SessionNet.NewMessage(MessageParams{
		Code:      BadRequest,
		Type:      Acknowledgement,
		MessageID: r.Msg.MessageID(),
		Token:     r.Msg.Token(),
		Payload:   []byte(err.Error()),
	})
	resp.SetOption(ContentFormat, TextPlain)
	w.Write(resp)
}

func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter, r *Request)) {
	if r.Msg.Token() != nil && (r.Msg.Code() == PUT || r.Msg.Code() == POST) {
		if b, ok := r.SessionNet.(*blockWiseSession); ok {
			msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue, Acknowledgement, r.Msg.Code())
	if blockWiseDebug {
		fmt.Printf("handleBlockWiseMsg r.msg=%v\n", r.Msg)
	}
	if r.Msg.Token() != nil {
		switch r.Msg.Code() {
		case PUT, POST:
			if b, ok := r.SessionNet.(*blockWiseSession); ok {
				msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue, r.Msg.Code())

			if err != nil {
				if err != nil {
					return
				}
				next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
				return
			}
			next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
			return
			/*
				//observe data
				case Content, Valid:
					if r.Msg.Option(Observe) != nil && r.Msg.Option(ETag) != nil {
						if b, ok := r.SessionNet.(*blockWiseSession); ok {
							token, err := GenerateToken(8)
							if err != nil {
								return
							}
							req := r.SessionNet.NewMessage(MessageParams{
								Code:      GET,
								Type:      Confirmable,
								MessageID: GenerateMessageID(),
								Token:     token,
							})
							req.AddOption(Block2, r.Msg.Option(Block2))
							req.AddOption(Size2, r.Msg.Option(Size2))

							msg, err := b.receivePayload(true, req, r.Msg, Block2, GET, r.Msg.Code())
							if err != nil {
								return
							}
							next(w, &Request{SessionNet: r.SessionNet, Msg: msg})
							return
						}
					}*/
		}

	}
	next(w, r)
}

type blockWiseResponseWriter struct {
	responseWriter
	*responseWriter
}

func (w *blockWiseResponseWriter) Write(msg Message) error {


@@ 654,3 689,37 @@ func (w *blockWiseResponseWriter) Write(msg Message) error {

	return ErrNotSupported
}

type blockWiseNoticeWriter struct {
	*responseWriter
}

func (w *blockWiseNoticeWriter) Write(msg Message) error {
	suggestedSzx := w.req.SessionNet.blockWiseSzx()
	if respBlock2, ok := w.req.Msg.Option(Block2).(uint32); ok {
		szx, _, _, err := UnmarshalBlockOption(respBlock2)
		if err != nil {
			return err
		}
		//BERT is supported only via TCP
		if szx == BlockSzxBERT && !w.req.SessionNet.IsTCP() {
			return ErrInvalidBlockSzx
		}
		suggestedSzx = szx
	}

	//resp is less them szx then just write msg without blockWise
	if len(msg.Payload()) < SZXVal[suggestedSzx] {
		return w.responseWriter.Write(msg)
	}

	if b, ok := w.req.SessionNet.(*blockWiseSession); ok {
		s := newSender(false, Block2, suggestedSzx, w.req.Msg.Code(), msg)
		req, err := s.createReq(b)
		if err != nil {
			return err
		}
		return b.SessionNet.Write(req)
	}
	return ErrNotSupported
}

M blockwise_test.go => blockwise_test.go +60 -59
@@ 3,8 3,6 @@ package coap
import (
	"fmt"
	"testing"

	"github.com/ugorji/go/codec"
)

func testMarshal(t *testing.T, szx BlockSzx, blockNumber uint, moreBlocksFollowing bool, expectedBlock uint32) {


@@ 82,33 80,6 @@ func TestBlockWiseBlockUnmarshal(t *testing.T) {
	}
}

var path = "/allAttributeTypes"
var server = "127.0.0.1:33643"

func decodeMsg(resp Message) {
	var m map[string]interface{}
	fmt.Printf("--------------------------------------\n")
	fmt.Printf("Path: %v\n", resp.PathString())
	fmt.Printf("Payload: %x\n", resp.Payload())
	err := codec.NewDecoderBytes(resp.Payload(), new(codec.CborHandle)).Decode(&m)
	if err != nil {
		fmt.Printf("cannot decode payload: %v!!!\n", err)

	} else {
		fmt.Printf("decoded type : %T\n", m)
		fmt.Printf("decoded value: %v\n", m)

		if val, ok := m["binaryAttribute"]; ok {
			fmt.Printf("binaryAttribute: %x\n", val)
		}
	}
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("RemoteAddr : %v\n", req.SessionNet.RemoteAddr())
	decodeMsg(req.Msg)
}

func TestServingUDPBlockSzx16(t *testing.T) {
	testServingTCPWithMsg(t, "udp", true, BlockSzx16, make([]byte, 128), simpleMsg)
}


@@ 196,18 167,53 @@ func TestServingTCPBigMsgBlockSzxBERT(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzxBERT, make([]byte, 10*1024*1024), simpleMsg)
}

/* test against IoTivity
import (
	"bytes"
	"github.com/ugorji/go/codec"
)
*/

/*
var path = "/allAttributeTypes"
var server = "127.0.0.1:43609"

func decodeMsg(resp Message) {
	var m map[string]interface{}
	fmt.Printf("--------------------------------------\n")
	fmt.Printf("Path: %v\n", resp.PathString())
	fmt.Printf("Payload: %x\n", resp.Payload())
	err := codec.NewDecoderBytes(resp.Payload(), new(codec.CborHandle)).Decode(&m)
	if err != nil {
		fmt.Printf("cannot decode payload: %v!!!\n", err)

	} else {
		fmt.Printf("decoded type : %T\n", m)
		fmt.Printf("decoded value: %v\n", m)

		if val, ok := m["binaryAttribute"]; ok {
			fmt.Printf("binaryAttribute: %v\n", val)
		}
	}
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("RemoteAddr : %v\n", req.SessionNet.RemoteAddr())
	decodeMsg(req.Msg)
}

func TestBlockWisePostBlock16(t *testing.T) {
	client := &Client{Net: "udp", ObserverFunc: observe}
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}
	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}

	fmt.Fprintf(os.Stdout, "conn: %v\n", co.LocalAddr())
	fmt.Printf("conn: %v\n", co.LocalAddr())

	payload := map[string]interface{}{
		"binaryAttribute": make([]byte, 17),
		"binaryAttribute": make([]byte, 33),
	}
	bw := new(bytes.Buffer)
	h := new(codec.CborHandle)


@@ 217,20 223,7 @@ func TestBlockWisePostBlock16(t *testing.T) {
		t.Fatalf("Cannot encode: %v", err)
	}

	token, err := GenerateToken(8)

	req := co.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      POST,
		MessageID: GenerateMessageID(),
		Token:     token,
		Payload:   bw.Bytes(),
	})

	req.SetPathString(path)
	req.SetOption(ContentFormat, AppCBOR)

	resp, err := co.Exchange(req)
	resp, err := co.Post(path, AppCBOR, bw)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}


@@ 238,28 231,36 @@ func TestBlockWisePostBlock16(t *testing.T) {
}

func TestBlockWiseGetBlock16(t *testing.T) {
	client := &Client{Net: "udp", ObserverFunc: observe}
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	resp, err := co.Get(path)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

	token, err := GenerateToken(8)
func TestBlockWiseObserveBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}

	req := co.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	sync := make(chan bool)
	_, err = co.Observe(path, func(req Message) {
		decodeMsg(req)
		sync <- true
	})

	req.SetPathString(path)

	resp, err := co.Exchange(req)
	if err != nil {
		t.Fatalf("Cannot post exchange")
		t.Fatalf("Unexpected error '%v'", err)
	}
	decodeMsg(resp)
	<-sync
}
*/

M client.go => client.go +173 -0
@@ 3,7 3,10 @@ package coap
// A client implementation.

import (
	"bytes"
	"crypto/tls"
	"io"
	"io/ioutil"
	"log"
	"net"
	"strings"


@@ 261,6 264,176 @@ func (co *ClientConn) Ping(timeout time.Duration) error {
	return co.session.Ping(timeout)
}

// Get retrieve the resource identified by the request path
func (co *ClientConn) Get(path string) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return co.session.Exchange(req)
}

func (co *ClientConn) putPostHelper(code COAPCode, path string, contentType MediaType, body io.Reader) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      POST,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	req.SetOption(ContentFormat, contentType)
	payload, err := ioutil.ReadAll(body)
	if err != nil {
		return nil, err
	}
	req.SetPayload(payload)
	return co.session.Exchange(req)
}

// Post update the resource identified by the request path
func (co *ClientConn) Post(path string, contentType MediaType, body io.Reader) (Message, error) {
	return co.putPostHelper(POST, path, contentType, body)
}

// Put create the resource identified by the request path
func (co *ClientConn) Put(path string, contentType MediaType, body io.Reader) (Message, error) {
	return co.putPostHelper(PUT, path, contentType, body)
}

// Delete delete the resource identified by the request path
func (co *ClientConn) Delete(path string) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      DELETE,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return co.session.Exchange(req)
}

//Observation represents subscription to resource on the server
type Observation struct {
	token     []byte
	path      string
	obsSeqNum uint32
	s         SessionNet
}

// Cancel remove observation from server. For recreate observation use Observe.
func (o *Observation) Cancel() error {
	req := o.s.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     o.token,
	})
	req.SetPathString(o.path)
	req.SetOption(Observe, 1)
	err1 := o.s.Write(req)
	err2 := o.s.sessionHandler().remove(o.token)
	if err1 != nil {
		return err1
	}
	return err2
}

// Observe subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (co *ClientConn) Observe(path string, observeFunc func(req Message)) (*Observation, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := co.session.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	req.SetOption(Observe, 0)
	block, err := MarshalBlockOption(co.session.blockWiseSzx(), 0, false)
	if err != nil {
		return nil, err
	}
	req.SetOption(Block1, block)
	o := &Observation{
		token:     token,
		path:      path,
		obsSeqNum: 0,
		s:         co.session,
	}
	co.session.sessionHandler().add(token, func(w ResponseWriter, r *Request, next HandlerFunc) {
		if r.Msg.Option(Observe) != nil && (r.Msg.Code() == Content || r.Msg.Code() == Valid) {
			obsSeqNum := r.Msg.Option(Observe).(uint32)
			//obs starts with 0, after that check obsSeqNum
			if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
				return
			}
			needGet := false
			resp := r.Msg
			if r.Msg.Option(Size2) != nil {
				if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
					needGet = true
				}
			}
			if !needGet {
				if block, ok := r.Msg.Option(Block2).(uint32); ok {
					_, _, more, err := UnmarshalBlockOption(block)
					if err != nil {
						return
					}
					needGet = more
				}
			}

			if needGet {
				resp, err = co.Get(path)
				if err != nil {
					return
				}
			}
			switch {
			case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
				//during processing observation, check if notification is still valid
				if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
					o.obsSeqNum = r.Msg.Option(Observe).(uint32)
					observeFunc(resp)
				}
			default:
				o.obsSeqNum = r.Msg.Option(Observe).(uint32)
				observeFunc(resp)
			}
			return
		}
		next(w, r)
	})

	err = co.session.Write(req)
	if err != nil {
		co.session.sessionHandler().remove(o.token)
		return nil, err
	}

	return o, nil
}

// Close close connection
func (co *ClientConn) Close() {
	co.srv.Shutdown()

M client_test.go => client_test.go +160 -14
@@ 2,7 2,6 @@ package coap

import (
	"bytes"
	"fmt"
	"log"
	"net"
	"strings"


@@ 14,27 13,31 @@ import (
)

func periodicTransmitter(w ResponseWriter, r *Request) {
	subded := time.Now()
	msg := r.SessionNet.NewMessage(MessageParams{
		Type:      Acknowledgement,
		Code:      Content,
		MessageID: r.Msg.MessageID(),
		Payload:   make([]byte, 15),
		Token:     r.Msg.Token(),
	})

	for {
		msg := r.SessionNet.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),
			Payload:   []byte(fmt.Sprintf("Been running for %v", time.Since(subded))),
		})
	//msg.SetOption(ContentFormat, TextPlain)
	msg.SetOption(LocationPath, r.Msg.Path())

		msg.SetOption(ContentFormat, TextPlain)
		msg.SetOption(LocationPath, r.Msg.Path())
	err := w.Write(msg)
	if err != nil {
		log.Printf("Error on transmitter, stopping: %v", err)
		return
	}

	go func() {
		time.Sleep(time.Second)
		err := w.Write(msg)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return
		}

		time.Sleep(time.Second)
	}
	}()
}

func testServingObservation(t *testing.T, net string, addrstr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) {


@@ 61,6 64,7 @@ func testServingObservation(t *testing.T, net string, addrstr string, BlockWiseT
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: 12345,
		Token:     []byte{123},
	})

	req.AddOption(Observe, 1)


@@ 178,3 182,145 @@ func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {
	}
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16, ifis)
}

func setupServer(t *testing.T) (string, error) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockSzx1024, func(w ResponseWriter, r *Request) {
		msg := r.SessionNet.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),
			Payload:   make([]byte, 5000),
			Token:     r.Msg.Token(),
		})

		msg.SetOption(ContentFormat, TextPlain)
		msg.SetOption(LocationPath, r.Msg.Path())

		err := w.Write(msg)
		if err != nil {
			t.Fatalf("Error on transmitter, stopping: %v", err)
			return
		}
	})
	return addr, err
}

func TestServingUDPGet(t *testing.T) {

	addr, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx16
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	_, err = con.Get("/tmp/test")
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
}

func TestServingUDPPost(t *testing.T) {
	addr, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	body := bytes.NewReader([]byte("Hello world"))
	_, err = con.Post("/tmp/test", TextPlain, body)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
}

func TestServingUDPPut(t *testing.T) {
	addr, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	body := bytes.NewReader([]byte("Hello world"))
	_, err = con.Put("/tmp/test", TextPlain, body)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
}

func TestServingUDPDelete(t *testing.T) {
	addr, err := setupServer(t)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	_, err = con.Delete("/tmp/test")
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
}

func TestServingUDPObserve(t *testing.T) {
	_, addr, _, err := RunLocalServerUDPWithHandler("udp", ":0", true, BlockSzx16, func(w ResponseWriter, r *Request) {
		msg := r.SessionNet.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),
			Payload:   make([]byte, 17),
			Token:     r.Msg.Token(),
		})

		msg.SetOption(ContentFormat, TextPlain)
		msg.SetOption(LocationPath, r.Msg.Path())
		msg.SetOption(Observe, 2)

		err := w.Write(msg)
		if err != nil {
			t.Fatalf("Error on transmitter, stopping: %v", err)
			return
		}
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockSzx1024
	c := Client{Net: "udp", BlockWiseTransfer: &BlockWiseTransfer, BlockWiseTransferSzx: &BlockWiseTransferSzx}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	sync := make(chan bool)
	_, err = con.Observe("/tmp/test", func(req Message) {
		sync <- true
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	<-sync
}

M error.go => error.go +2 -0
@@ 82,3 82,5 @@ const ErrInvalidBlockSzx = Error("invalid block-wise transfer szx")
const ErrRequestEntityIncomplete = Error("payload comes in bad order")

const ErrInvalidRequest = Error("invalid request")

const ErrInvalidPayload = Error("invalid payload")

A getresponsewriter.go => getresponsewriter.go +13 -0
@@ 0,0 1,13 @@
package coap

type getResponseWriter struct {
	w ResponseWriter
}

func (r *getResponseWriter) Write(msg Message) error {
	if msg.Payload() != nil && msg.Option(ETag) == nil {
		msg.SetOption(ETag, CalcETag(msg.Payload()))
	}

	return r.w.Write(msg)
}

M responsewriter.go => responsewriter.go +0 -1
@@ 1,7 1,6 @@
package coap

//A ResponseWriter interface is used by an CAOP handler to construct an COAP response.
//A ResponseWriter may not be used after the Handler.ServeCOAP method has returned.
type ResponseWriter interface {
	Write(Message) error
}

M server.go => server.go +14 -5
@@ 567,11 567,20 @@ func (srv *Server) serveUDP(conn *net.UDPConn) error {
}

func (srv *Server) serve(r *Request) {
	var w ResponseWriter
	if r.SessionNet.blockWiseEnabled() {
		w = &blockWiseResponseWriter{responseWriter{req: r}}
	} else {
		w = &responseWriter{req: r}
	w := ResponseWriter(&responseWriter{req: r})
	switch {
	case r.Msg.Code() == GET:
		switch {
		// set blockwise notice writer for observe
		case r.SessionNet.blockWiseEnabled() && r.Msg.Option(Observe) != nil:
			w = &blockWiseNoticeWriter{responseWriter: w.(*responseWriter)}
		// set blockwise if it is enabled
		case r.SessionNet.blockWiseEnabled():
			w = &blockWiseResponseWriter{responseWriter: w.(*responseWriter)}
		}
		w = &getResponseWriter{w}
	case r.SessionNet.blockWiseEnabled():
		w = &blockWiseResponseWriter{responseWriter: w.(*responseWriter)}
	}

	handlePairMsg(w, r, func(w ResponseWriter, r *Request) {

R session.go => sessionnet.go +2 -2
@@ 272,7 272,7 @@ func (s *sessionUDP) Ping(timeout time.Duration) error {
}

func (s *sessionTCP) Ping(timeout time.Duration) error {
	token, err := GenerateToken(MaxTokenSize)
	token, err := GenerateToken()
	if err != nil {
		return err
	}


@@ 489,7 489,7 @@ func (s *sessionUDP) handlePairMsg(w ResponseWriter, r *Request) bool {
}

func (s *sessionTCP) sendCSM() error {
	token, err := GenerateToken(MaxTokenSize)
	token, err := GenerateToken()
	if err != nil {
		return err
	}

M utils.go => utils.go +14 -5
@@ 2,15 2,14 @@ package coap

import (
	"crypto/rand"
	"encoding/binary"
	"hash/crc64"
	"sync/atomic"
)

// GenerateToken generates a random token by a given length
func GenerateToken(n int) ([]byte, error) {
	if n == 0 || n > 8 {
		return nil, ErrInvalidTokenLen
	}
	b := make([]byte, n)
func GenerateToken() ([]byte, error) {
	b := make([]byte, MaxTokenSize)
	_, err := rand.Read(b)
	// Note that err == nil only if we read len(b) bytes.
	if err != nil {


@@ 26,3 25,13 @@ var msgIdIter = uint32(0)
func GenerateMessageID() uint16 {
	return uint16(atomic.AddUint32(&msgIdIter, 1) % 0xffff)
}

// Calculate ETag from payload via CRC64
func CalcETag(payload []byte) []byte {
	if payload != nil {
		b := make([]byte, 8)
		binary.LittleEndian.PutUint64(b, crc64.Checksum(payload, crc64.MakeTable(crc64.ISO)))
		return b
	}
	return nil
}