~fnux/yggdrasil-go-coap

10f1de20dee49222295c6b92948476da77c87995 — Timothée Floure 1 year, 8 months ago aa153d0
Blindly implement Yggdrasil session and listener/connection handler
2 files changed, 225 insertions(+), 0 deletions(-)

M server.go
A sessionyggdrasil.go
M server.go => server.go +76 -0
@@ 168,6 168,8 @@ type Server struct {
	newSessionTCPFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If newSessionUDPFunc is set it is called when session DTLS want to be created
	newSessionDTLSFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If newSessionYggdrasilFunc is set it is called when session Yggdrasil want to be created
	newSessionYggdrasilFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w *ClientConn)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.


@@ 446,6 448,19 @@ func (srv *Server) activateAndServe(listener Listener, conn *coapNet.Conn, connU
		}
	}

	if srv.newSessionYggdrasilFunc == nil {
		srv.newSessionYggdrasilFunc = func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
			session, err := newSessionYggdrasil(connection, srv)
			if err != nil {
				return nil, err
			}
			if session.blockWiseEnabled() {
				return &blockWiseSession{networkSession: session}, nil
			}
			return session, nil
		}
	}

	if srv.NotifySessionNewFunc == nil {
		srv.NotifySessionNewFunc = func(w *ClientConn) {}
	}


@@ 459,6 474,11 @@ func (srv *Server) activateAndServe(listener Listener, conn *coapNet.Conn, connU
		if _, ok := listener.(*coapNet.DTLSListener); ok {
			return srv.serveDTLSListener(listener)
		}

		if _, ok := listener.(*coapNet.YggdrasilListener); ok {
			return srv.serveYggdrasilListener(listener)
		}

		return srv.serveTCPListener(listener)
	case conn != nil:
		if strings.HasSuffix(srv.Net, "-dtls") {


@@ 634,6 654,62 @@ func (srv *Server) serveTCPListener(l Listener) error {
	}
}

func (srv *Server) serveYggdrasilConnection(ctx *shutdownContext, conn *coapNet.Conn) error {
	session, err := srv.newSessionYggdrasilFunc(conn, srv)
	if err != nil {
		return err
	}
	c := ClientConn{commander: &ClientCommander{session}}
	srv.NotifySessionNewFunc(&c)

	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	for {
		m := make([]byte, ^uint16(0))
		n, err := conn.ReadWithContext(ctx, m)
		if err != nil {
			err := fmt.Errorf("cannot serve Yggdrasil connection %v", err)
			srv.closeSessions(err)
			return err
		}
		msg, err := ParseDgramMessage(m[:n])
		if err != nil {
			continue
		}

		// We will block poller wait loop when
		// all pool workers are busy.
		c := ClientConn{commander: &ClientCommander{session}}
		srv.spawnWorker(&Request{Client: &c, Msg: msg, Ctx: sessCtx, Sequence: c.Sequence()})
	}
}

// serveYggdrasilListener starts an Yggdrasil listener for the server.
func (srv *Server) serveYggdrasilListener(l Listener) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

	var wg sync.WaitGroup
	ctx := newShutdownWithContext(srv.doneChan)

	for {
		rw, err := l.AcceptWithContext(ctx)
		if err != nil {
			wg.Wait()
			return fmt.Errorf("cannot serve yggdrasil: %v", err)
		}
		if rw != nil {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveYggdrasilConnection(ctx, coapNet.NewConn(rw, srv.heartBeat()))
			}()
		}
	}
}

func (srv *Server) closeSessions(err error) {
	srv.sessionUDPMapLock.Lock()
	tmp := srv.sessionUDPMap

A sessionyggdrasil.go => sessionyggdrasil.go +149 -0
@@ 0,0 1,149 @@
package coap

import (
	"bytes"
	"context"
	"fmt"
	"net"

	coapNet "./net"
)

type sessionYggdrasil struct {
	sessionBase
	connection *coapNet.Conn
}

// newSessionYggdrasil create new session for Yggdrasil connection
func newSessionYggdrasil(connection *coapNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {
		BlockWiseTransfer = *srv.BlockWiseTransfer
	}
	if srv.BlockWiseTransferSzx != nil {
		BlockWiseTransferSzx = *srv.BlockWiseTransferSzx
	}

	if BlockWiseTransfer && BlockWiseTransferSzx == BlockWiseSzxBERT {
		return nil, ErrInvalidBlockWiseSzx
	}

	s := sessionYggdrasil{
		connection: connection,
		sessionBase: sessionBase{
			srv:                  srv,
			handler:              &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)},
			blockWiseTransfer:    BlockWiseTransfer,
			blockWiseTransferSzx: uint32(BlockWiseTransferSzx),
			mapPairs:             make(map[[MaxTokenSize]byte]map[uint16](*sessionResp)),
		},
	}

	return &s, nil
}

// LocalAddr implements the networkSession.LocalAddr method.
func (s *sessionYggdrasil) LocalAddr() net.Addr {
	return s.connection.LocalAddr()
}

// RemoteAddr implements the networkSession.RemoteAddr method.
func (s *sessionYggdrasil) RemoteAddr() net.Addr {
	return s.connection.RemoteAddr()
}

// BlockWiseTransferEnabled
func (s *sessionYggdrasil) blockWiseEnabled() bool {
	return s.blockWiseTransfer
}

func (s *sessionYggdrasil) blockWiseIsValid(szx BlockWiseSzx) bool {
	return szx <= BlockWiseSzx1024
}

// Ping send ping over udp(unicast) and wait for response.
func (s *sessionYggdrasil) PingWithContext(ctx context.Context) error {
  // TODO: check if relevant for Yggdrasil.
	//provoking to get a reset message - "CoAP ping" in RFC-7252
	//https://tools.ietf.org/html/rfc7252#section-4.2
	//https://tools.ietf.org/html/rfc7252#section-4.3
	//https://tools.ietf.org/html/rfc7252#section-1.2 "Reset Message"
	// BUG of iotivity: https://jira.iotivity.org/browse/IOT-3149
	req := s.NewMessage(MessageParams{
		Type:      Confirmable,
		Code:      Empty,
		MessageID: GenerateMessageID(),
	})
	resp, err := s.ExchangeWithContext(ctx, req)
	if err != nil {
		return err
	}
	if resp.Type() == Reset {
		return nil
	}
	return ErrInvalidResponse
}

func (s *sessionYggdrasil) closeWithError(err error) error {
	if s.connection != nil {
		c := ClientConn{commander: &ClientCommander{s}}
		s.srv.NotifySessionEndFunc(&c, err)
		e := s.connection.Close()
		//s.connection = nil
		if e == nil {
			e = err
		}
		return e
	}
	return err
}

// Close implements the networkSession.Close method
func (s *sessionYggdrasil) Close() error {
	return s.closeWithError(nil)
}

// NewMessage Create message for response
func (s *sessionYggdrasil) NewMessage(p MessageParams) Message {
	return NewDgramMessage(p)
}

func (s *sessionYggdrasil) IsTCP() bool {
	return false
}

func (s *sessionYggdrasil) ExchangeWithContext(ctx context.Context, req Message) (Message, error) {
	return s.exchangeWithContext(ctx, req, s.WriteMsgWithContext)
}

// Write implements the networkSession.Write method.
func (s *sessionYggdrasil) WriteMsgWithContext(ctx context.Context, req Message) error {
	buffer := bytes.NewBuffer(make([]byte, 0, 1500))
	err := req.MarshalBinary(buffer)
	if err != nil {
		return fmt.Errorf("cannot write msg to tcp connection %v", err)
	}
	return s.connection.WriteWithContext(ctx, buffer.Bytes())
}

func (s *sessionYggdrasil) sendPong(w ResponseWriter, r *Request) error {
	resp := r.Client.NewMessage(MessageParams{
		Type:      Reset,
		Code:      Empty,
		MessageID: r.Msg.MessageID(),
	})
	return w.WriteMsgWithContext(r.Ctx, resp)
}

func (s *sessionYggdrasil) handleSignals(w ResponseWriter, r *Request) bool {
	switch r.Msg.Code() {
	// handle of udp ping
	case Empty:
		if r.Msg.Type() == Confirmable && r.Msg.AllOptions().Len() == 0 && (r.Msg.Payload() == nil || len(r.Msg.Payload()) == 0) {
			s.sendPong(w, r)
			return true
		}
	}
	return false
}