~fnux/yggdrasil-go-coap

9662f3fa57f5b5dbcfda24c940fb9be6aaa90d92 — Jozef Kralik 2 years ago bfbd2fd
add MulticastClient for send multicast request
M blockwise.go => blockwise.go +40 -29
@@ 65,7 65,22 @@ func UnmarshalBlockOption(blockVal uint32) (szx BlockSzx, blockNumber uint, more
	return
}

func exchangeDrivedByPeer(session SessionNet, req Message) (Message, error) {
func exchangeDrivedByPeer(session SessionNet, req Message, blockType OptionID) (Message, error) {
	if block, ok := req.Option(blockType).(uint32); ok {
		_, _, more, err := UnmarshalBlockOption(block)
		if err != nil {
			return nil, err
		}
		if more == false {
			// we send all datas to peer -> create empty response
			err := session.Write(req)
			if err != nil {
				return nil, err
			}
			return session.NewMessage(MessageParams{}), nil
		}
	}

	pair := make(chan *Request, 1)
	session.sessionHandler().add(req.Token(), func(w ResponseWriter, r *Request, next HandlerFunc) {
		select {


@@ 163,7 178,7 @@ func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, e
		log.Printf("sendPayload %p req=%v\n", b, req)
	}
	if s.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req)
		resp, err = exchangeDrivedByPeer(b.SessionNet, req, s.blockType)
	} else {
		resp, err = b.SessionNet.Exchange(req)
	}


@@ 178,7 193,7 @@ func (s *blockWiseSender) exchange(b *blockWiseSession, req Message) (Message, e

func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Message) (Message, error) {
	if s.currentMore == false {
		if s.blockType == Block1 /*&& resp.Code() == Changed*/ {
		if s.blockType == Block1 {
			if respBlock2, ok := resp.Option(Block2).(uint32); ok {
				szx, num, _, err := UnmarshalBlockOption(respBlock2)
				if err != nil {


@@ 189,7 204,7 @@ func (s *blockWiseSender) processResp(b *blockWiseSession, req Message, resp Mes
				}
				if num == 0 {
					resp.RemoveOption(s.sizeType())
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code(), Changed)
					return b.receivePayload(s.peerDrive, s.origin, resp, Block2, s.origin.Code())
				}
			}
		}


@@ 279,12 294,14 @@ type blockWiseSession struct {

func (b *blockWiseSession) Exchange(msg Message) (Message, error) {
	switch msg.Code() {
	//these methods doesn't need to be handled by blockwise
	case CSM, Ping, Pong, Release, Abort, Empty:
		return b.SessionNet.Exchange(msg)
	case GET, DELETE:
		return b.receivePayload(false, msg, nil, Block2, msg.Code(), Content)
		return b.receivePayload(false, msg, nil, Block2, msg.Code())
	case POST, PUT:
		return b.sendPayload(false, Block1, b.SessionNet.blockWiseSzx(), Continue, msg)
	// for response code
	default:
		return b.sendPayload(true, Block2, b.SessionNet.blockWiseSzx(), Continue, msg)
	}


@@ 382,15 399,14 @@ func (r *blockWiseReceiver) createReq(b *blockWiseSession, resp Message) (Messag
	return req, nil
}

func newReceiver(b *blockWiseSession, peerDrive bool, origin Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (r *blockWiseReceiver, res Message, err error) {
func newReceiver(b *blockWiseSession, peerDrive bool, origin Message, resp Message, blockType OptionID, code COAPCode) (r *blockWiseReceiver, res Message, err error) {
	r = &blockWiseReceiver{
		peerDrive:    peerDrive,
		code:         code,
		expectedCode: expectedCode,
		origin:       origin,
		blockType:    blockType,
		currentSzx:   b.SessionNet.blockWiseSzx(),
		payload:      []byte{},
		peerDrive:  peerDrive,
		code:       code,
		origin:     origin,
		blockType:  blockType,
		currentSzx: b.SessionNet.blockWiseSzx(),
		payload:    []byte{},
	}

	if resp != nil {


@@ 460,7 476,7 @@ func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message,
	var resp Message
	var err error
	if r.peerDrive {
		resp, err = exchangeDrivedByPeer(b.SessionNet, req)
		resp, err = exchangeDrivedByPeer(b.SessionNet, req, r.blockType)
	} else {
		resp, err = b.SessionNet.Exchange(req)
	}


@@ 473,12 489,6 @@ func (r *blockWiseReceiver) exchange(b *blockWiseSession, req Message) (Message,
}

func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp Message) (Message, error) {
	/*
		if resp.Code() != r.expectedCode {
			return nil, ErrInvalidRequest
		}
	*/

	if respBlock, ok := resp.Option(r.blockType).(uint32); ok {
		szx, num, more, err := UnmarshalBlockOption(respBlock)
		if err != nil {


@@ 516,7 526,9 @@ func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp M
			if r.payloadSize != 0 && int(r.payloadSize) != len(r.payload) {
				return nil, ErrInvalidPayloadSize
			}
			resp.SetPayload(r.payload)
			if len(r.payload) > 0 {
				resp.SetPayload(r.payload)
			}
			// remove block used by blockWise
			resp.RemoveOption(r.sizeType())
			resp.RemoveOption(r.blockType)


@@ 539,12 551,11 @@ func (r *blockWiseReceiver) processResp(b *blockWiseSession, req Message, resp M
		}
		req.SetOption(r.blockType, block)
	} else {
		switch r.blockType {
		case Block1:
			return nil, ErrInvalidOptionBlock1
		default:
			return nil, ErrInvalidOptionBlock2
		if r.payloadSize != 0 && int(r.payloadSize) != len(resp.Payload()) {
			return nil, ErrInvalidPayloadSize
		}
		//response is whole doesn't need to use blockwise
		return resp, nil
	}
	return nil, nil
}


@@ 569,8 580,8 @@ func (r *blockWiseReceiver) sendError(b *blockWiseSession, code COAPCode, resp M
	b.sendErrorMsg(code, typ, token, MessageID)
}

func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode, expectedCode COAPCode) (Message, error) {
	r, resp, err := newReceiver(b, peerDrive, msg, resp, blockType, code, expectedCode)
func (b *blockWiseSession) receivePayload(peerDrive bool, msg Message, resp Message, blockType OptionID, code COAPCode) (Message, error) {
	r, resp, err := newReceiver(b, peerDrive, msg, resp, blockType, code)
	if err != nil {
		r.sendError(b, BadRequest, resp)
		return nil, err


@@ 619,7 630,7 @@ func handleBlockWiseMsg(w ResponseWriter, r *Request, next func(w ResponseWriter
		switch r.Msg.Code() {
		case PUT, POST:
			if b, ok := r.SessionNet.(*blockWiseSession); ok {
				msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue, r.Msg.Code())
				msg, err := b.receivePayload(true, r.Msg, nil, Block1, Continue)

				if err != nil {
					return

M blockwise_test.go => blockwise_test.go +0 -98
@@ 166,101 166,3 @@ func TestServingTCPBigMsgBlockSzx1024(t *testing.T) {
func TestServingTCPBigMsgBlockSzxBERT(t *testing.T) {
	testServingTCPWithMsg(t, "tcp", true, BlockSzxBERT, make([]byte, 10*1024*1024), simpleMsg)
}

/* test against IoTivity
import (
	"bytes"
	"github.com/ugorji/go/codec"
)
*/

/*
var path = "/allAttributeTypes"
var server = "127.0.0.1:43609"

func decodeMsg(resp Message) {
	var m map[string]interface{}
	fmt.Printf("--------------------------------------\n")
	fmt.Printf("Path: %v\n", resp.PathString())
	fmt.Printf("Payload: %x\n", resp.Payload())
	err := codec.NewDecoderBytes(resp.Payload(), new(codec.CborHandle)).Decode(&m)
	if err != nil {
		fmt.Printf("cannot decode payload: %v!!!\n", err)

	} else {
		fmt.Printf("decoded type : %T\n", m)
		fmt.Printf("decoded value: %v\n", m)

		if val, ok := m["binaryAttribute"]; ok {
			fmt.Printf("binaryAttribute: %v\n", val)
		}
	}
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("RemoteAddr : %v\n", req.SessionNet.RemoteAddr())
	decodeMsg(req.Msg)
}

func TestBlockWisePostBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}
	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}

	fmt.Printf("conn: %v\n", co.LocalAddr())

	payload := map[string]interface{}{
		"binaryAttribute": make([]byte, 33),
	}
	bw := new(bytes.Buffer)
	h := new(codec.CborHandle)
	enc := codec.NewEncoder(bw, h)
	err = enc.Encode(&payload)
	if err != nil {
		t.Fatalf("Cannot encode: %v", err)
	}

	resp, err := co.Post(path, AppCBOR, bw)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

func TestBlockWiseGetBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	resp, err := co.Get(path)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

func TestBlockWiseObserveBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", ObserverFunc: observe, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(server)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	sync := make(chan bool)
	_, err = co.Observe(path, func(req Message) {
		decodeMsg(req)
		sync <- true
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	<-sync
}
*/

M client.go => client.go +114 -83
@@ 19,6 19,7 @@ type ClientConn struct {
	client       *Client
	session      SessionNet
	shutdownSync chan error
	multicast    bool
}

// A Client defines parameters for a COAP client.


@@ 31,7 32,7 @@ type Client struct {
	WriteTimeout   time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	SyncTimeout    time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal

	ObserverFunc         HandlerFunc     // for handling observation messages from server
	Handler              HandlerFunc     // default handler for handling messages from server
	NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.

	BlockWiseTransfer    *bool     // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)


@@ 69,6 70,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	dialer := &net.Dialer{Timeout: c.DialTimeout}
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockSzx1024
	multicast := false

	switch c.Net {
	case "tcp-tls", "tcp4-tls", "tcp6-tls":


@@ 111,6 113,7 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
		sessionUPDData = &SessionUDPData{raddr: a}
		conn = udpConn
		BlockWiseTransfer = true
		multicast = true
	default:
		return nil, ErrInvalidNetParameter
	}


@@ 124,50 127,55 @@ func (c *Client) Dial(address string) (clientConn *ClientConn, err error) {
	}

	sync := make(chan bool)
	clientConn = &ClientConn{srv: &Server{
		Net:                  network,
		TLSConfig:            c.TLSConfig,
		Conn:                 conn,
		ReadTimeout:          c.readTimeout(),
		WriteTimeout:         c.writeTimeout(),
		MaxMessageSize:       c.MaxMessageSize,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		NotifyStartedFunc: func() {
			timeout := c.syncTimeout()
			select {
			case sync <- true:
			case <-time.After(timeout):
				log.Fatal("Client cannot send start: Timeout")
			}
		},
		NotifySessionEndFunc: func(s SessionNet, err error) {
			if c.NotifySessionEndFunc != nil {
				c.NotifySessionEndFunc(err)
			}
		},
		createSessionTCPFunc: func(connection Conn, srv *Server) (SessionNet, error) {
			return clientConn.session, nil
		},
		createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
			if sessionUDPData.RemoteAddr().String() == clientConn.session.RemoteAddr().String() {
				if s, ok := clientConn.session.(*blockWiseSession); ok {
					s.SessionNet.(*sessionUDP).sessionUDPData = sessionUDPData
				} else {
					clientConn.session.(*sessionUDP).sessionUDPData = sessionUDPData
	clientConn = &ClientConn{
		srv: &Server{
			Net:                  network,
			TLSConfig:            c.TLSConfig,
			Conn:                 conn,
			ReadTimeout:          c.readTimeout(),
			WriteTimeout:         c.writeTimeout(),
			MaxMessageSize:       c.MaxMessageSize,
			BlockWiseTransfer:    &BlockWiseTransfer,
			BlockWiseTransferSzx: &BlockWiseTransferSzx,
			NotifyStartedFunc: func() {
				timeout := c.syncTimeout()
				select {
				case sync <- true:
				case <-time.After(timeout):
					log.Fatal("Client cannot send start: Timeout")
				}
			},
			NotifySessionEndFunc: func(s SessionNet, err error) {
				if c.NotifySessionEndFunc != nil {
					c.NotifySessionEndFunc(err)
				}
			},
			createSessionTCPFunc: func(connection Conn, srv *Server) (SessionNet, error) {
				return clientConn.session, nil
			}
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{SessionNet: session}, nil
			}
			return session, nil
		}, Handler: c.ObserverFunc},
		shutdownSync: make(chan error)}
			},
			createSessionUDPFunc: func(connection Conn, srv *Server, sessionUDPData *SessionUDPData) (SessionNet, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.session.RemoteAddr().String() {
					if s, ok := clientConn.session.(*blockWiseSession); ok {
						s.SessionNet.(*sessionUDP).sessionUDPData = sessionUDPData
					} else {
						clientConn.session.(*sessionUDP).sessionUDPData = sessionUDPData
					}
					return clientConn.session, nil
				}
				session, err := newSessionUDP(connection, srv, sessionUDPData)
				if err != nil {
					return nil, err
				}
				if session.blockWiseEnabled() {
					return &blockWiseSession{SessionNet: session}, nil
				}
				return session, nil
			},
			Handler: c.Handler,
		},
		shutdownSync: make(chan error),
		multicast:    multicast,
	}

	switch clientConn.srv.Conn.(type) {
	case *net.TCPConn, *tls.Conn:


@@ 236,6 244,9 @@ func (co *ClientConn) RemoteAddr() net.Addr {
// To specify a local address or a timeout, the caller has to set the `Client.Dialer`
// attribute appropriately
func (co *ClientConn) Exchange(m Message) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	return co.session.Exchange(m)
}



@@ 266,6 277,9 @@ func (co *ClientConn) Ping(timeout time.Duration) error {

// Get retrieve the resource identified by the request path
func (co *ClientConn) Get(path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err


@@ 281,6 295,9 @@ func (co *ClientConn) Get(path string) (Message, error) {
}

func (co *ClientConn) putPostHelper(code COAPCode, path string, contentType MediaType, body io.Reader) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err


@@ 313,6 330,9 @@ func (co *ClientConn) Put(path string, contentType MediaType, body io.Reader) (M

// Delete delete the resource identified by the request path
func (co *ClientConn) Delete(path string) (Message, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err


@@ 356,6 376,9 @@ func (o *Observation) Cancel() error {
// Observe subscribe to severon path. After subscription and every change on path,
// server sends immediately response
func (co *ClientConn) Observe(path string, observeFunc func(req Message)) (*Observation, error) {
	if co.multicast {
		return nil, ErrNotSupported
	}
	token, err := GenerateToken()
	if err != nil {
		return nil, err


@@ 368,63 391,71 @@ func (co *ClientConn) Observe(path string, observeFunc func(req Message)) (*Obse
	})
	req.SetPathString(path)
	req.SetOption(Observe, 0)
	block, err := MarshalBlockOption(co.session.blockWiseSzx(), 0, false)
	if err != nil {
		return nil, err
	}
	req.SetOption(Block1, block)
	/*
		IoTivity doesn't support Block2 in first request for GET
		block, err := MarshalBlockOption(co.session.blockWiseSzx(), 0, false)
		if err != nil {
			return nil, err
		}
		req.SetOption(Block2, block)
	*/
	o := &Observation{
		token:     token,
		path:      path,
		obsSeqNum: 0,
		s:         co.session,
	}
	co.session.sessionHandler().add(token, func(w ResponseWriter, r *Request, next HandlerFunc) {
		if r.Msg.Option(Observe) != nil && (r.Msg.Code() == Content || r.Msg.Code() == Valid) {
			obsSeqNum := r.Msg.Option(Observe).(uint32)
			//obs starts with 0, after that check obsSeqNum
			if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
				return
			}
			needGet := false
			resp := r.Msg
			if r.Msg.Option(Size2) != nil {
				if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
					needGet = true
				}
	err = co.session.sessionHandler().add(token, func(w ResponseWriter, r *Request, next HandlerFunc) {
		needGet := false
		resp := r.Msg
		if r.Msg.Option(Size2) != nil {
			if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
				needGet = true
			}
			if !needGet {
				if block, ok := r.Msg.Option(Block2).(uint32); ok {
					_, _, more, err := UnmarshalBlockOption(block)
					if err != nil {
						return
					}
					needGet = more
		}
		if !needGet {
			if block, ok := r.Msg.Option(Block2).(uint32); ok {
				_, _, more, err := UnmarshalBlockOption(block)
				if err != nil {
					return
				}
				needGet = more
			}
		}

			if needGet {
				resp, err = co.Get(path)
				if err != nil {
		if needGet {
			resp, err = co.Get(path)
			if err != nil {
				return
			}
		}
		setObsSeqNum := func() {
			if r.Msg.Option(Observe) != nil {
				obsSeqNum := r.Msg.Option(Observe).(uint32)
				//obs starts with 0, after that check obsSeqNum
				if obsSeqNum != 0 && o.obsSeqNum > obsSeqNum {
					return
				}
				o.obsSeqNum = obsSeqNum
			}
			switch {
			case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
				//during processing observation, check if notification is still valid
				if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
					o.obsSeqNum = r.Msg.Option(Observe).(uint32)
					observeFunc(resp)
				}
			default:
				o.obsSeqNum = r.Msg.Option(Observe).(uint32)
		}

		switch {
		case r.Msg.Option(ETag) != nil && resp.Option(ETag) != nil:
			//during processing observation, check if notification is still valid
			if bytes.Equal(resp.Option(ETag).([]byte), r.Msg.Option(ETag).([]byte)) {
				setObsSeqNum()
				observeFunc(resp)
			}
			return
		default:
			setObsSeqNum()
			observeFunc(resp)
		}
		next(w, r)
		return
	})

	if err != nil {
		return nil, err
	}
	err = co.session.Write(req)
	if err != nil {
		co.session.sessionHandler().remove(o.token)

M client_test.go => client_test.go +2 -2
@@ 44,7 44,7 @@ func testServingObservation(t *testing.T, net string, addrstr string, BlockWiseT
	sync := make(chan bool)

	client := &Client{
		ObserverFunc: func(w ResponseWriter, r *Request) {
		Handler: func(w ResponseWriter, r *Request) {
			log.Printf("Gotaaa %s", r.Msg.Payload())
			sync <- true
		},


@@ 110,7 110,7 @@ func testServingMCastByClient(t *testing.T, lnet, laddr string, BlockWiseTransfe

	c := Client{
		Net: lnet,
		ObserverFunc: func(w ResponseWriter, r *Request) {
		Handler: func(w ResponseWriter, r *Request) {
			if bytes.Equal(r.Msg.Payload(), payload) {
				log.Printf("mcast %v -> %v", r.SessionNet.RemoteAddr(), r.SessionNet.LocalAddr())
				ansArrived <- true

M examples/mcast/client/main.go => examples/mcast/client/main.go +6 -20
@@ 2,39 2,25 @@ package main

import (
	"log"
	"time"

	coap "github.com/go-ocf/go-coap"
)

func mcastResp(s coap.SessionNet, m coap.Message) {
	log.Printf("Got message: %#v from %v", m, s.RemoteAddr())
}

func main() {
	client := &coap.Client{Net: "udp-mcast", ObserverFunc: mcastResp}
	client := &coap.MulticastClient{}

	conn, err := client.Dial("224.0.1.187:5683")
	if err != nil {
		log.Fatalf("Error dialing: %v", err)
	}

	req := conn.NewMessage(coap.MessageParams{
		Type:      coap.NonConfirmable,
		Code:      coap.GET,
		MessageID: 12345,
		Payload:   []byte("mcast payload"),
	sync := make(chan bool)
	_, err = conn.Publish("/oic/res", func(req coap.Message) {
		log.Printf("Got message: %#v from %v", req, req)
		sync <- true
	})
	req.SetOption(coap.ContentFormat, coap.TextPlain)
	req.SetPathString("/oic/res")

	err = conn.WriteMsg(req, time.Hour)
	if err != nil {
		log.Fatalf("Error sending request: %v", err)
	}

	// waiting for messages that arrives in 20seconds
	time.Sleep(20 * time.Second)
	log.Printf("Done...\n")

	<-sync
}

M examples/mcast/server/main.go => examples/mcast/server/main.go +6 -7
@@ 2,23 2,22 @@ package main

import (
	"log"
	"time"

	coap "github.com/go-ocf/go-coap"
)

func handleMcast(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	res := w.NewMessage(coap.MessageParams{
func handleMcast(w coap.ResponseWriter, r *coap.Request) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", r.Msg.Path(), r.Msg, r.SessionNet.RemoteAddr())
	res := r.SessionNet.NewMessage(coap.MessageParams{
		Type:      coap.Acknowledgement,
		Code:      coap.Content,
		MessageID: req.MessageID(),
		Token:     req.Token(),
		MessageID: r.Msg.MessageID(),
		Token:     r.Msg.Token(),
		Payload:   []byte("hello to you!"),
	})
	res.SetOption(coap.ContentFormat, coap.TextPlain)

	if err := w.WriteMsg(res, time.Hour); err != nil {
	if err := w.Write(res); err != nil {
		log.Printf("Cannot write resp %v", err)
	}
}

M examples/observe/client/main.go => examples/observe/client/main.go +15 -20
@@ 2,39 2,34 @@ package main

import (
	"log"
	"time"

	coap "github.com/go-ocf/go-coap"
)

func observe(s coap.SessionNet, m coap.Message) {
	log.Printf("Got %s", m.Payload())
func observe(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got %s", req.Msg.Payload())
}

func main() {
	client := &coap.Client{ObserverFunc: observe}
	sync := make(chan bool)
	client := &coap.Client{}

	conn, err := client.Dial("localhost:5688")
	co, err := client.Dial("localhost:5688")
	if err != nil {
		log.Fatalf("Error dialing: %v", err)
	}

	req := conn.NewMessage(coap.MessageParams{
		Type:      coap.NonConfirmable,
		Code:      coap.GET,
		MessageID: 12345,
	num := 0
	obs, err := co.Observe("/some/path", func(req coap.Message) {
		log.Printf("Got %s", req.Payload())
		num++
		if num >= 10 {
			sync <- true
		}
	})

	req.AddOption(coap.Observe, 1)
	req.SetPathString("/some/path")

	err = conn.WriteMsg(req, time.Hour)
	if err != nil {
		log.Fatalf("Error sending request: %v", err)
		log.Fatalf("Unexpected error '%v'", err)
	}

	// waiting for messages that arrives in 20seconds
	time.Sleep(20 * time.Second)
	log.Printf("Done...\n")
	<-sync
	obs.Cancel()

}

M examples/observe/server/main.go => examples/observe/server/main.go +25 -24
@@ 8,42 8,43 @@ import (
	coap "github.com/go-ocf/go-coap"
)

func periodicTransmitter(w coap.SessionNet, req coap.Message) {
	subded := time.Now()
func sendResponse(w coap.ResponseWriter, req *coap.Request, subded time.Time) error {
	resp := req.SessionNet.NewMessage(coap.MessageParams{
		Type:      coap.Acknowledgement,
		Code:      coap.Content,
		MessageID: req.Msg.MessageID(),
		Payload:   []byte(fmt.Sprintf("Been running for %v", time.Since(subded))),
		Token:     req.Msg.Token(),
	})

	resp.SetOption(coap.ContentFormat, coap.TextPlain)
	return w.Write(resp)
}

func periodicTransmitter(w coap.ResponseWriter, req *coap.Request) {
	subded := time.Now()
	for {
		msg := w.NewMessage(coap.MessageParams{
			Type:      coap.Acknowledgement,
			Code:      coap.Content,
			MessageID: req.MessageID(),
			Payload:   []byte(fmt.Sprintf("Been running for %v", time.Since(subded))),
			Token:     []byte("123"),
		})

		msg.SetOption(coap.ContentFormat, coap.TextPlain)
		msg.SetOption(coap.LocationPath, req.Path())

		log.Printf("Transmitting %v", msg)
		err := w.WriteMsg(msg, time.Hour)
		err := sendResponse(w, req, subded)
		if err != nil {
			log.Printf("Error on transmitter, stopping: %v", err)
			return
		}

		time.Sleep(time.Second)
	}
}

func main() {
	log.Fatal(coap.ListenAndServe(":5688", "udp",
		coap.HandlerFunc(func(w coap.SessionNet, req coap.Message) {
			log.Printf("Got message path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
			if req.Code() == coap.GET && req.Option(coap.Observe) != nil {
				value := req.Option(coap.Observe)
				if value.(uint32) > 0 {
					go periodicTransmitter(w, req)
				} else {
					log.Printf("coap.Observe value=%v", value)
		coap.HandlerFunc(func(w coap.ResponseWriter, req *coap.Request) {
			log.Printf("Got message path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
			switch {
			case req.Msg.Code() == coap.GET && req.Msg.Option(coap.Observe) != nil && req.Msg.Option(coap.Observe).(uint32) == 0:
				go periodicTransmitter(w, req)
			case req.Msg.Code() == coap.GET:
				subded := time.Now()
				err := sendResponse(w, req, subded)
				if err != nil {
					log.Printf("Error on transmitter: %v", err)
				}
			}
		})))

M examples/simple/client/main.go => examples/simple/client/main.go +2 -19
@@ 3,7 3,6 @@ package main
import (
	"log"
	"os"
	"time"

	coap "github.com/go-ocf/go-coap"
)


@@ 13,31 12,15 @@ func main() {
	if err != nil {
		log.Fatalf("Error dialing: %v", err)
	}

	req := co.NewMessage(coap.MessageParams{
		Type:      coap.Confirmable,
		Code:      coap.GET,
		MessageID: 12345,
		Payload:   []byte("hello, world!"),
		Token:     []byte("1234"),
	})

	path := "/a"
	if len(os.Args) > 1 {
		path = os.Args[1]
	}
	resp, err := co.Get(path)

	req.SetOption(coap.ETag, "weetag")
	req.SetOption(coap.MaxAge, 3)
	req.SetPathString(path)

	rv, err := co.Exchange(req, 1*time.Second)
	if err != nil {
		log.Fatalf("Error sending request: %v", err)
	}

	if rv != nil {
		log.Printf("Response payload: %v", rv.Payload())
	}

	log.Printf("Response payload: %v", resp.Payload())
}

M examples/simple/server/main.go => examples/simple/server/main.go +14 -15
@@ 2,42 2,41 @@ package main

import (
	"log"
	"time"

	coap "github.com/go-ocf/go-coap"
)

func handleA(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	if req.IsConfirmable() {
		res := w.NewMessage(coap.MessageParams{
func handleA(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleA: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
	if req.Msg.IsConfirmable() {
		res := req.SessionNet.NewMessage(coap.MessageParams{
			Type:      coap.Acknowledgement,
			Code:      coap.Content,
			MessageID: req.MessageID(),
			Token:     req.Token(),
			MessageID: req.Msg.MessageID(),
			Token:     req.Msg.Token(),
			Payload:   []byte("hello to you!"),
		})
		res.SetOption(coap.ContentFormat, coap.TextPlain)

		log.Printf("Transmitting from A %#v", res)
		w.WriteMsg(res, time.Hour)
		w.Write(res)
	}
}

func handleB(w coap.SessionNet, req coap.Message) {
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Path(), req, w.RemoteAddr())
	if req.IsConfirmable() {
		res := w.NewMessage(coap.MessageParams{
func handleB(w coap.ResponseWriter, req *coap.Request) {
	log.Printf("Got message in handleB: path=%q: %#v from %v", req.Msg.Path(), req.Msg, req.SessionNet.RemoteAddr())
	if req.Msg.IsConfirmable() {
		res := req.SessionNet.NewMessage(coap.MessageParams{
			Type:      coap.Acknowledgement,
			Code:      coap.Content,
			MessageID: req.MessageID(),
			Token:     req.Token(),
			MessageID: req.Msg.MessageID(),
			Token:     req.Msg.Token(),
			Payload:   []byte("good bye!"),
		})
		res.SetOption(coap.ContentFormat, coap.TextPlain)

		log.Printf("Transmitting from B %#v", res)
		w.WriteMsg(res, time.Hour)
		w.Write(res)
	}
}


A iotivity_test.go => iotivity_test.go +156 -0
@@ 0,0 1,156 @@
package coap

/*
import (
	"bytes"
	"fmt"
	"testing"

	"github.com/ugorji/go/codec"
)

var path = "/oic/d"
var udpServer = "127.0.0.1:49629"
var tcpServer = "127.0.0.1:42635"

func decodeMsg(resp Message) {
	var m interface{}
	fmt.Printf("--------------------------------------\n")
	fmt.Printf("path: %v\n", resp.PathString())
	fmt.Printf("code: %v\n", resp.Code())
	fmt.Printf("type: %v\n", resp.Type())
	contentFormat := TextPlain
	if resp.Option(ContentFormat) != nil {
		contentFormat = resp.Option(ContentFormat).(MediaType)
		fmt.Printf("content format: %v\n", contentFormat)
	}
	if resp.Payload() != nil && len(resp.Payload()) > 0 {
		switch contentFormat {
		case AppCBOR:
			err := codec.NewDecoderBytes(resp.Payload(), new(codec.CborHandle)).Decode(&m)
			if err != nil {
				fmt.Printf("cannot decode payload: %v!!!\n", err)
			} else {
				fmt.Printf("payload type: %T\n", m)
				fmt.Printf("payload value: %v\n", m)
			}
		case AppJSON:
			err := codec.NewDecoderBytes(resp.Payload(), new(codec.JsonHandle)).Decode(&m)
			if err != nil {
				fmt.Printf("cannot decode payload: %v!!!\n", err)
			} else {
				fmt.Printf("payload type: %T\n", m)
				fmt.Printf("payload value: %v\n", m)
			}
		}
	}
	fmt.Printf("resp raw: %v\n", resp)
}

func observe(w ResponseWriter, req *Request) {
	fmt.Printf("OBSERVE : %v\n", req.SessionNet.RemoteAddr())
	decodeMsg(req.Msg)
}

func TestBlockWisePostBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", Handler: observe, BlockWiseTransferSzx: &szx}
	co, err := client.Dial(udpServer)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}

	fmt.Printf("conn: %v\n", co.LocalAddr())

	payload := map[string]interface{}{
		"binaryAttribute": make([]byte, 33),
	}
	bw := new(bytes.Buffer)
	h := new(codec.CborHandle)
	enc := codec.NewEncoder(bw, h)
	err = enc.Encode(&payload)
	if err != nil {
		t.Fatalf("Cannot encode: %v", err)
	}

	resp, err := co.Post(path, AppCBOR, bw)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

func TestBlockWiseGetBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &Client{Net: "udp", Handler: observe, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(udpServer)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	resp, err := co.Get(path)
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}

func TestBlockWiseObserveBlock16(t *testing.T) {
	szx := BlockSzx16
	sync := make(chan bool)
	client := &Client{Net: "udp", Handler: func(w ResponseWriter, req *Request) {
		observe(w, req)
		t.Fatalf("unexpected  called handler")
		sync <- true
	}, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(udpServer)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	_, err = co.Observe(path, func(req Message) {
		decodeMsg(req)
		sync <- true
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	<-sync
	co.Close()
}

func TestBlockWiseMulticastBlock16(t *testing.T) {
	szx := BlockSzx16
	client := &MulticastClient{Net: "udp", Handler: observe, BlockWiseTransferSzx: &szx}

	co, err := client.Dial("224.0.1.187:5683")
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	sync := make(chan bool)
	_, err = co.Publish("/oic/res", func(req Message) {
		decodeMsg(req)
		sync <- true
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	<-sync
}

func TestGetBlock16(t *testing.T) {
	szx := BlockSzx16
	bw := false
	client := &Client{Net: "tcp", Handler: observe, BlockWiseTransfer: &bw, BlockWiseTransferSzx: &szx}

	co, err := client.Dial(tcpServer)
	if err != nil {
		t.Fatalf("Error dialing: %v", err)
	}
	resp, err := co.Get("/oic/res")
	if err != nil {
		t.Fatalf("Cannot post exchange")
	}
	decodeMsg(resp)
}
*/

M message.go => message.go +51 -0
@@ 6,6 6,7 @@ import (
	"io"
	"log"
	"reflect"
	"strconv"
	"strings"
)



@@ 260,6 261,56 @@ const (
	AppLwm2mJSON      MediaType = 11543 //application/vnd.oma.lwm2m+json
)

func (c MediaType) String() string {
	switch c {
	case TextPlain:
		return "text/plain;charset=utf-8"
	case AppCoseEncrypt0:
		return "application/cose; cose-type=\"cose-encrypt0\" (RFC 8152)"
	case AppCoseMac0:
		return "application/cose; cose-type=\"cose-mac0\" (RFC 8152)"
	case AppCoseSign1:
		return "application/cose; cose-type=\"cose-sign1\" (RFC 8152)"
	case AppLinkFormat:
		return "application/link-format"
	case AppXML:
		return "application/xml"
	case AppOctets:
		return "application/octet-stream"
	case AppExi:
		return "application/exi"
	case AppJSON:
		return "application/json"
	case AppJsonPatch:
		return "application/json-patch+json (RFC6902)"
	case AppJsonMergePatch:
		return "application/merge-patch+json (RFC7396)"
	case AppCBOR:
		return "application/cbor (RFC 7049)"
	case AppCWT:
		return "application/cwt"
	case AppCoseEncrypt:
		return "application/cose; cose-type=\"cose-encrypt\" (RFC 8152)"
	case AppCoseMac:
		return "application/cose; cose-type=\"cose-mac\" (RFC 8152)"
	case AppCoseSign:
		return "application/cose; cose-type=\"cose-sign\" (RFC 8152)"
	case AppCoseKey:
		return "application/cose-key (RFC 8152)"
	case AppCoseKeySet:
		return "application/cose-key-set (RFC 8152)"
	case AppCoapGroup:
		return "coap-group+json (RFC 7390)"
	case AppOcfCbor:
		return "application/vnd.ocf+cbor"
	case AppLwm2mTLV:
		return "application/vnd.oma.lwm2m+tlv"
	case AppLwm2mJSON:
		return "application/vnd.oma.lwm2m+json"
	}
	return "Unknown media type: 0x" + strconv.FormatInt(int64(c), 16)
}

type option struct {
	ID    OptionID
	Value interface{}

A multicastClient.go => multicastClient.go +200 -0
@@ 0,0 1,200 @@
package coap

// A client implementation.

import (
	"net"
	"time"
)

// A ClientConn represents a connection to a COAP server.
type MulticastClientConn struct {
	conn   *ClientConn
	client *MulticastClient
}

// A MulticastClient defines parameters for a COAP client.
type MulticastClient struct {
	Net            string        // "udp" / "udp4" / "udp6"
	MaxMessageSize uint16        // Max message size that could be received from peer. If not set it defaults to 1152 B.
	DialTimeout    time.Duration // set Timeout for dialer
	ReadTimeout    time.Duration // net.ClientConn.SetReadTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	WriteTimeout   time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
	SyncTimeout    time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal

	Handler              HandlerFunc     // default handler for handling messages from server
	NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.

	BlockWiseTransfer    *bool     // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
	BlockWiseTransferSzx *BlockSzx // Set maximal block size of payload that will be send in fragment

	multicastHandler *sessionHandler
}

// Dial connects to the address on the named network.
func (c *MulticastClient) dialNet(net, address string) (*ClientConn, error) {
	if c.multicastHandler == nil {
		c.multicastHandler = &sessionHandler{tokenHandlers: make(map[[MaxTokenSize]byte]func(w ResponseWriter, r *Request, next HandlerFunc))}
	}
	client := &Client{
		Net:            net,
		MaxMessageSize: c.MaxMessageSize,
		DialTimeout:    c.DialTimeout,
		ReadTimeout:    c.ReadTimeout,
		WriteTimeout:   c.WriteTimeout,
		SyncTimeout:    c.SyncTimeout,
		Handler: func(w ResponseWriter, r *Request) {
			handler := c.Handler
			if handler == nil {
				handler = HandleFailed
			}
			c.multicastHandler.handle(w, r, handler)
		},
		NotifySessionEndFunc: c.NotifySessionEndFunc,
		BlockWiseTransfer:    c.BlockWiseTransfer,
		BlockWiseTransferSzx: c.BlockWiseTransferSzx,
	}

	return client.Dial(address)
}

// Dial connects to the address on the named network.
func (c *MulticastClient) Dial(address string) (*MulticastClientConn, error) {
	var net string
	switch c.Net {
	case "udp", "udp4", "udp6":
		net = c.Net + "-mcast"
	case "":
		net = "udp-mcast"
	default:
		return nil, ErrInvalidNetParameter
	}
	conn, err := c.dialNet(net, address)
	if err != nil {
		return nil, err
	}
	return &MulticastClientConn{
		conn:   conn,
		client: c,
	}, nil
}

// LocalAddr implements the SessionNet.LocalAddr method.
func (mconn *MulticastClientConn) LocalAddr() net.Addr {
	return mconn.conn.LocalAddr()
}

// RemoteAddr implements the SessionNet.RemoteAddr method.
func (mconn *MulticastClientConn) RemoteAddr() net.Addr {
	return mconn.conn.RemoteAddr()
}

// NewMessage Create message for request
func (mconn *MulticastClientConn) NewMessage(p MessageParams) Message {
	return mconn.conn.NewMessage(p)
}

// WriteMsg sends a message through the connection co.
func (mconn *MulticastClientConn) Write(m Message) error {
	return mconn.conn.Write(m)
}

// SetReadDeadline set read deadline for timeout for Exchange
func (mconn *MulticastClientConn) SetReadDeadline(timeout time.Duration) {
	mconn.conn.SetReadDeadline(timeout)
}

// SetWriteDeadline set write deadline for timeout for Exchange and Write
func (mconn *MulticastClientConn) SetWriteDeadline(timeout time.Duration) {
	mconn.conn.SetWriteDeadline(timeout)
}

func (mconn *MulticastClientConn) Close() {
	mconn.conn.Close()
}

//Observation represents subscription to resource on the server
type ResponseWaiter struct {
	token []byte
	path  string
	conn  *MulticastClientConn
}

// Cancel remove observation from server. For recreate observation use Observe.
func (r *ResponseWaiter) Cancel() error {
	return r.conn.client.multicastHandler.remove(r.token)
}

type messageNewer interface {
	NewMessage(MessageParams) Message
}

func createGetReq(m messageNewer, path string) (Message, error) {
	token, err := GenerateToken()
	if err != nil {
		return nil, err
	}
	req := m.NewMessage(MessageParams{
		Type:      NonConfirmable,
		Code:      GET,
		MessageID: GenerateMessageID(),
		Token:     token,
	})
	req.SetPathString(path)
	return req, nil
}

// Publish subscribe to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) Publish(path string, responseHandler func(resp Message)) (*ResponseWaiter, error) {
	req, err := createGetReq(mconn, path)
	if err != nil {
		return nil, err
	}
	r := &ResponseWaiter{
		token: req.Token(),
		path:  path,
		conn:  mconn,
	}
	err = mconn.client.multicastHandler.add(req.Token(), func(w ResponseWriter, r *Request, next HandlerFunc) {
		needGet := false
		resp := r.Msg
		if r.Msg.Option(Size2) != nil {
			if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
				needGet = true
			}
		}
		if !needGet {
			if block, ok := r.Msg.Option(Block2).(uint32); ok {
				_, _, more, err := UnmarshalBlockOption(block)
				if err != nil {
					return
				}
				needGet = more
			}
		}

		if needGet {
			getReq, err := createGetReq(mconn, path)
			if err != nil {
				return
			}
			resp, err = r.SessionNet.Exchange(getReq)
			if err != nil {
				return
			}
		}
		responseHandler(resp)
	})
	if err != nil {
		return nil, err
	}

	err = mconn.Write(req)
	if err != nil {
		mconn.client.multicastHandler.remove(r.token)
		return nil, err
	}

	return r, nil
}

A multicastClient_test.go => multicastClient_test.go +33 -0
@@ 0,0 1,33 @@
package coap

import (
	"testing"
)

func TestServingIPv4MCastBlockSzx16(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx16, 1033)
}

func TestServingIPv4MCastBlockSzx32(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx32, 1033)
}

func TestServingIPv4MCastBlockSzx64(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx64, 1033)
}

func TestServingIPv4MCastBlockSzx128(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx128, 1033)
}

func TestServingIPv4MCastBlockSzx256(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx256, 1033)
}

func TestServingIPv4MCastBlockSzx512(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx512, 1033)
}

func TestServingIPv4MCastBlockSzx1024(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockSzx1024, 1033)
}

M server.go => server.go +1 -2
@@ 49,9 49,8 @@ func (f HandlerFunc) ServeCOAP(w ResponseWriter, r *Request) {

// HandleFailed returns a HandlerFunc that returns NotFound for every request it gets.
func HandleFailed(w ResponseWriter, req *Request) {
	typ := Reset
	msg := req.SessionNet.NewMessage(MessageParams{
		Type:      typ,
		Type:      Acknowledgement,
		Code:      NotFound,
		MessageID: req.Msg.MessageID(),
		Token:     req.Msg.Token(),

M server_test.go => server_test.go +56 -26
@@ 186,7 186,7 @@ func testServingTCPWithMsgWithObserver(t *testing.T, net string, BlockWiseTransf
	var err error
	c := &Client{
		Net:                  net,
		ObserverFunc:         observeFunc,
		Handler:              observeFunc,
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}


@@ 403,31 403,65 @@ func TestServingChallengingTimeoutClientTLS(t *testing.T) {
	})
}

func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx) {
	payload := []byte("mcast payload")
func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockSzx, payloadLen int) {
	addrMcast := laddr
	ansArrived := make(chan bool)

	HandleFunc("/test", func(w ResponseWriter, r *Request) {
		if bytes.Equal(r.Msg.Payload(), payload) {
			log.Printf("mcast %v -> %v", r.SessionNet.RemoteAddr(), r.SessionNet.LocalAddr())
			ansArrived <- true
		} else {
			t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.SessionNet.RemoteAddr())
	responseServerConn := make([]*ClientConn, 0)
	var lockResponseServerConn sync.Mutex
	responseServer := Client{
		Net:                  strings.Trim(lnet, "-mcast"),
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		Handler: func(w ResponseWriter, r *Request) {
			resp := r.SessionNet.NewMessage(MessageParams{
				Type:      Acknowledgement,
				Code:      Content,
				MessageID: r.Msg.MessageID(),
				Payload:   make([]byte, payloadLen),
				Token:     r.Msg.Token(),
			})
			err := w.Write(resp)
			if err != nil {
				t.Fatalf("cannot send response %v", err)
			}
		},
	}

	s, _, fin, err := RunLocalServerUDPWithHandler(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx, func(w ResponseWriter, r *Request) {
		resp := r.SessionNet.NewMessage(MessageParams{
			Type:      Acknowledgement,
			Code:      Content,
			MessageID: r.Msg.MessageID(),
			Payload:   make([]byte, payloadLen),
			Token:     r.Msg.Token(),
		})
		conn, err := responseServer.Dial(r.SessionNet.RemoteAddr().String())
		if err != nil {
			t.Fatalf("cannot create connection %v", err)
		}
		err = conn.Write(resp)
		if err != nil {
			t.Fatalf("cannot send response %v", err)
		}
		lockResponseServerConn.Lock()
		responseServerConn = append(responseServerConn, conn)
		lockResponseServerConn.Unlock()
	})
	defer HandleRemove("/test")

	s, _, fin, err := RunLocalUDPServer(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx)
	if err != nil {
		t.Fatalf("unable to run test server: %v", err)
	}
	defer func() {
		s.Shutdown()
		lockResponseServerConn.Lock()
		for _, conn := range responseServerConn {
			conn.Close()
		}
		lockResponseServerConn.Unlock()
		<-fin
	}()

	c := Client{
	c := MulticastClient{
		Net:                  strings.TrimSuffix(lnet, "-mcast"),
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,


@@ 439,27 473,23 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
	}
	defer co.Close()

	req := &DgramMessage{
		MessageBase{
			typ:       NonConfirmable,
			code:      GET,
			messageID: 1234,
			payload:   payload,
		}}
	req.SetOption(ContentFormat, TextPlain)
	req.SetPathString("/test")

	co.Write(req)
	rp, err := co.Publish("/test", func(Msg Message) {
		ansArrived <- true
	})
	if err != nil {
		t.Fatalf("unable to publishing: %v", err)
	}
	defer rp.Cancel()

	<-ansArrived
}

func TestServingIPv4MCast(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", false, BlockSzx16)
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", false, BlockSzx16, 16)
}

func TestServingIPv6MCast(t *testing.T) {
	testServingMCast(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16)
	testServingMCast(t, "udp6-mcast", "[ff03::158]:11111", false, BlockSzx16, 16)
}

func BenchmarkServe(b *testing.B) {

M udp.go => udp.go +4 -7
@@ 3,7 3,7 @@
package coap

import (
	"bytes"
	"encoding/base64"
	"net"

	"golang.org/x/net/ipv4"


@@ 38,10 38,8 @@ func (s *SessionUDPData) RemoteAddr() net.Addr { return s.raddr }

// Key returns the key session for the map using
func (s *SessionUDPData) Key() string {
	var sessionKey bytes.Buffer
	sessionKey.WriteString(s.RemoteAddr().String())
	sessionKey.Write(s.context)
	return sessionKey.String()
	key := s.RemoteAddr().String() + "-" + base64.StdEncoding.EncodeToString(s.context)
	return key
}

// ReadFromSessionUDP acts just like net.UDPConn.ReadFrom(), but returns a session object instead of a


@@ 113,7 111,6 @@ func correctSource(oob []byte) []byte {
func joinGroup(conn *net.UDPConn, ifi *net.Interface, gaddr *net.UDPAddr) error {
	if ip4 := conn.LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
		return ipv4.NewPacketConn(conn).JoinGroup(ifi, gaddr)
	} else {
		return ipv6.NewPacketConn(conn).JoinGroup(ifi, gaddr)
	}
	return ipv6.NewPacketConn(conn).JoinGroup(ifi, gaddr)
}