~fnux/yggdrasil-go-coap

6251d1201ea55633381bcccde2f47303b3736a40 — Jozef Kralik 2 years ago 37b6b19
move net package from kit
M Gopkg.lock => Gopkg.lock +40 -41
@@ 2,58 2,43 @@


[[projects]]
  branch = "master"
  digest = "1:23c4482f7db54097030431eb88c87af95d0e072582cea3f6fdbc81ae22cd11f5"
  name = "github.com/go-ocf/kit"
  packages = [
    "log",
    "net",
  ]
  pruneopts = "UT"
  revision = "6cb557ba9da5f1103d2807dfc6803d202952e8d7"

[[projects]]
  digest = "1:4887e9e89c80299aa520d718239809fdd2a47a9aa394909b169959bfbc424ddf"
  name = "github.com/ugorji/go"
  packages = ["codec"]
  digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec"
  name = "github.com/davecgh/go-spew"
  packages = ["spew"]
  pruneopts = "UT"
  revision = "8fd0f8d918c8f0b52d0af210a812ba882cc31a1e"
  version = "v1.1.2"
  revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73"
  version = "v1.1.1"

[[projects]]
  digest = "1:3c1a69cdae3501bf75e76d0d86dc6f2b0a7421bc205c0cb7b96b19eed464a34d"
  name = "go.uber.org/atomic"
  packages = ["."]
  digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe"
  name = "github.com/pmezard/go-difflib"
  packages = ["difflib"]
  pruneopts = "UT"
  revision = "1ea20fb1cbb1cc08cbd0d913a96dead89aa18289"
  version = "v1.3.2"
  revision = "792786c7400a136282c1664665ae0a8db921c6c2"
  version = "v1.0.0"

[[projects]]
  digest = "1:60bf2a5e347af463c42ed31a493d817f8a72f102543060ed992754e689805d1a"
  name = "go.uber.org/multierr"
  packages = ["."]
  digest = "1:5da8ce674952566deae4dbc23d07c85caafc6cfa815b0b3e03e41979cedb8750"
  name = "github.com/stretchr/testify"
  packages = [
    "assert",
    "require",
  ]
  pruneopts = "UT"
  revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a"
  version = "v1.1.0"
  revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053"
  version = "v1.3.0"

[[projects]]
  digest = "1:c52caf7bd44f92e54627a31b85baf06a68333a196b3d8d241480a774733dcf8b"
  name = "go.uber.org/zap"
  packages = [
    ".",
    "buffer",
    "internal/bufferpool",
    "internal/color",
    "internal/exit",
    "zapcore",
  ]
  digest = "1:d0072748c62defde1ad99dde77f6ffce492a0e5aea9204077e497c7edfb86653"
  name = "github.com/ugorji/go"
  packages = ["codec"]
  pruneopts = "UT"
  revision = "ff33455a0e382e8a81d14dd7c922020b6b5e7982"
  version = "v1.9.1"
  revision = "2adff0894ba3bc2eeb9f9aea45fefd49802e1a13"
  version = "v1.1.4"

[[projects]]
  branch = "master"
  digest = "1:433ecd67c12cd2ebf2aaadfa9ff7ebadf400d3a31351078d902c12f1e0f6c2e5"
  digest = "1:34d3139bb084e2883e9823772afb88a3e9fd5812bade1e5ba455cbdffb15732a"
  name = "golang.org/x/net"
  packages = [
    "bpf",


@@ 63,14 48,28 @@
    "ipv6",
  ]
  pruneopts = "UT"
  revision = "74de082e2cca95839e88aa0aeee5aadf6ce7710f"
  revision = "d28f0bde5980168871434b95cfc858db9f2a7a99"

[[projects]]
  branch = "master"
  digest = "1:3398992b8f4b00ce4a6a5f9770008cf66dcff8f128b51eeab8c1f657096cac8f"
  name = "golang.org/x/sys"
  packages = [
    "unix",
    "windows",
  ]
  pruneopts = "UT"
  revision = "15dcb6c0061f497a3f66e3ea034b629c6dd4d99e"

[solve-meta]
  analyzer-name = "dep"
  analyzer-version = 1
  input-imports = [
    "github.com/go-ocf/kit/net",
    "github.com/stretchr/testify/assert",
    "github.com/stretchr/testify/require",
    "github.com/ugorji/go/codec",
    "golang.org/x/net/ipv4",
    "golang.org/x/net/ipv6",
  ]
  solver-name = "gps-cdcl"
  solver-version = 1

M Gopkg.toml => Gopkg.toml +7 -3
@@ 26,12 26,16 @@


[[constraint]]
  branch = "master"
  name = "github.com/go-ocf/kit"
  name = "github.com/stretchr/testify"
  version = "1.3.0"

[[constraint]]
  name = "github.com/ugorji/go"
  version = "1.1.2"
  version = "1.1.4"

[[constraint]]
  branch = "master"
  name = "golang.org/x/net"

[prune]
  go-tests = true

M client.go => client.go +11 -11
@@ 11,7 11,7 @@ import (
	"strings"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	coapNet "github.com/go-ocf/go-coap/net"
)

// A ClientConn represents a connection to a COAP server.


@@ 67,7 67,7 @@ func listenUDP(network, address string) (*net.UDPAddr, *net.UDPConn, error) {
	if udpConn, err = net.ListenUDP(network, a); err != nil {
		return nil, nil, err
	}
	if err := kitNet.SetUDPSocketOptions(udpConn); err != nil {
	if err := coapNet.SetUDPSocketOptions(udpConn); err != nil {
		return nil, nil, err
	}
	return a, udpConn, nil


@@ 82,7 82,7 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon

	var conn net.Conn
	var network string
	var sessionUPDData *kitNet.ConnUDPContext
	var sessionUPDData *coapNet.ConnUDPContext

	dialer := &net.Dialer{Timeout: c.DialTimeout}
	BlockWiseTransfer := false


@@ 115,7 115,7 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		if conn, err = dialer.DialContext(ctx, network, address); err != nil {
			return nil, err
		}
		sessionUPDData = kitNet.NewConnUDPContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		sessionUPDData = coapNet.NewConnUDPContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		BlockWiseTransfer = true
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		var err error


@@ 132,10 132,10 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		if err != nil {
			return nil, fmt.Errorf("cannot listen address: %v", err)
		}
		if err = kitNet.SetUDPSocketOptions(udpConn); err != nil {
		if err = coapNet.SetUDPSocketOptions(udpConn); err != nil {
			return nil, fmt.Errorf("cannot set upd socket options: %v", err)
		}
		sessionUPDData = kitNet.NewConnUDPContext(multicastAddress, nil)
		sessionUPDData = coapNet.NewConnUDPContext(multicastAddress, nil)
		conn = udpConn
		BlockWiseTransfer = true
		multicast = true


@@ 168,10 168,10 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
					c.NotifySessionEndFunc(err)
				}
			},
			newSessionTCPFunc: func(connection *kitNet.Conn, srv *Server) (networkSession, error) {
			newSessionTCPFunc: func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
				return clientConn.commander.networkSession, nil
			},
			newSessionUDPFunc: func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
			newSessionUDPFunc: func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
				if sessionUDPData.RemoteAddr().String() == clientConn.commander.networkSession.RemoteAddr().String() {
					if s, ok := clientConn.commander.networkSession.(*blockWiseSession); ok {
						s.networkSession.(*sessionUDP).sessionUDPData = sessionUDPData


@@ 198,7 198,7 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon

	switch clientConn.srv.Conn.(type) {
	case *net.TCPConn, *tls.Conn:
		session, err := newSessionTCP(kitNet.NewConn(clientConn.srv.Conn, clientConn.srv.heartBeat()), clientConn.srv)
		session, err := newSessionTCP(coapNet.NewConn(clientConn.srv.Conn, clientConn.srv.heartBeat()), clientConn.srv)
		if err != nil {
			return nil, err
		}


@@ 209,8 209,8 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		}
	case *net.UDPConn:
		// WriteContextMsgUDP returns error when addr is filled in SessionUDPData for connected socket
		kitNet.SetUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(kitNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat(), c.MulticastHopLimit), clientConn.srv, sessionUPDData)
		coapNet.SetUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(coapNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat(), c.MulticastHopLimit), clientConn.srv, sessionUPDData)
		if err != nil {
			return nil, err
		}

A net/conn.go => net/conn.go +113 -0
@@ 0,0 1,113 @@
package net

import (
	"bufio"
	"context"
	"fmt"
	"net"
	"sync"
	"time"
)

// Conn is a generic stream-oriented network connection that provides Read/Write with context.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn struct {
	heartBeat  time.Duration
	connection net.Conn
	readBuffer *bufio.Reader
	lock       sync.Mutex
}

// NewConn creates connection over net.Conn.
func NewConn(c net.Conn, heartBeat time.Duration) *Conn {
	connection := Conn{
		connection: c,
		heartBeat:  heartBeat,
		readBuffer: bufio.NewReaderSize(c, 2048),
	}
	return &connection
}

// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
func (c *Conn) LocalAddr() net.Addr {
	return c.connection.LocalAddr()
}

// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
func (c *Conn) RemoteAddr() net.Addr {
	return c.connection.RemoteAddr()
}

// Close closes the connection.
func (c *Conn) Close() error {
	return c.connection.Close()
}

// WriteContext writes data with context.
func (c *Conn) WriteWithContext(ctx context.Context, data []byte) error {
	written := 0
	c.lock.Lock()
	defer c.lock.Unlock()
	for written < len(data) {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		err := c.connection.SetWriteDeadline(time.Now().Add(c.heartBeat))
		if err != nil {
			return fmt.Errorf("cannot set write deadline for tcp connection: %v", err)
		}
		n, err := c.connection.Write(data[written:])

		if err != nil {
			if isTemporary(err) {
				continue
			}
			return fmt.Errorf("cannot write to tcp connection")
		}
		written += n
	}
	return nil
}

// ReadFullContext reads stream with context until whole buffer is satisfied.
func (c *Conn) ReadFullWithContext(ctx context.Context, buffer []byte) error {
	offset := 0
	for offset < len(buffer) {
		n, err := c.ReadWithContext(ctx, buffer[offset:])
		if err != nil {
			return fmt.Errorf("cannot read full from tcp connection: %v", err)
		}
		offset += n
	}
	return nil
}

// ReadContext reads stream with context.
func (c *Conn) ReadWithContext(ctx context.Context, buffer []byte) (int, error) {
	for {
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return -1, fmt.Errorf("cannot read from tcp connection: %v", ctx.Err())
			}
			return -1, fmt.Errorf("cannot read from tcp connection")
		default:
		}

		err := c.connection.SetReadDeadline(time.Now().Add(c.heartBeat))
		if err != nil {
			return -1, fmt.Errorf("cannot set read deadline for tcp connection: %v", err)
		}
		n, err := c.readBuffer.Read(buffer)
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return -1, fmt.Errorf("cannot read from tcp connection: %v", err)
		}
		return n, err
	}
}

A net/connUDP.go => net/connUDP.go +260 -0
@@ 0,0 1,260 @@
package net

import (
	"context"
	"fmt"
	"net"
	"sync"
	"time"

	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

// ConnUDP is a udp connection provides Read/Write with context.
//
// Multiple goroutines may invoke methods on a ConnUDP simultaneously.
type ConnUDP struct {
	heartBeat         time.Duration
	connection        *net.UDPConn
	packetConn        packetConn
	multicastHopLimit int

	lock sync.Mutex
}

type packetConn interface {
	SetWriteDeadline(t time.Time) error
	WriteTo(b []byte, dst net.Addr) (n int, err error)
	SetMulticastInterface(ifi *net.Interface) error
	SetMulticastHopLimit(hoplim int) error
	SetMulticastLoopback(on bool) error
	JoinGroup(ifi *net.Interface, group net.Addr) error
}

type packetConnIPv4 struct {
	packetConnIPv4 *ipv4.PacketConn
}

func newPacketConnIPv4(p *ipv4.PacketConn) *packetConnIPv4 {
	return &packetConnIPv4{p}
}

func (p *packetConnIPv4) SetMulticastInterface(ifi *net.Interface) error {
	return p.packetConnIPv4.SetMulticastInterface(ifi)
}

func (p *packetConnIPv4) SetWriteDeadline(t time.Time) error {
	return p.packetConnIPv4.SetWriteDeadline(t)
}

func (p *packetConnIPv4) WriteTo(b []byte, dst net.Addr) (n int, err error) {
	return p.packetConnIPv4.WriteTo(b, nil, dst)
}

func (p *packetConnIPv4) SetMulticastHopLimit(hoplim int) error {
	return p.packetConnIPv4.SetMulticastTTL(hoplim)
}

func (p *packetConnIPv4) SetMulticastLoopback(on bool) error {
	return p.packetConnIPv4.SetMulticastLoopback(on)
}

func (p *packetConnIPv4) JoinGroup(ifi *net.Interface, group net.Addr) error {
	return p.packetConnIPv4.JoinGroup(ifi, group)
}

type packetConnIPv6 struct {
	packetConnIPv6 *ipv6.PacketConn
}

func newPacketConnIPv6(p *ipv6.PacketConn) *packetConnIPv6 {
	return &packetConnIPv6{p}
}

func (p *packetConnIPv6) SetMulticastInterface(ifi *net.Interface) error {
	return p.packetConnIPv6.SetMulticastInterface(ifi)
}

func (p *packetConnIPv6) SetWriteDeadline(t time.Time) error {
	return p.packetConnIPv6.SetWriteDeadline(t)
}

func (p *packetConnIPv6) WriteTo(b []byte, dst net.Addr) (n int, err error) {
	return p.packetConnIPv6.WriteTo(b, nil, dst)
}

func (p *packetConnIPv6) SetMulticastHopLimit(hoplim int) error {
	return p.packetConnIPv6.SetMulticastHopLimit(hoplim)
}

func (p *packetConnIPv6) SetMulticastLoopback(on bool) error {
	return p.packetConnIPv6.SetMulticastLoopback(on)
}

func (p *packetConnIPv6) JoinGroup(ifi *net.Interface, group net.Addr) error {
	return p.packetConnIPv6.JoinGroup(ifi, group)
}

func isIPv6(addr net.IP) bool {
	if ip := addr.To16(); ip != nil && ip.To4() == nil {
		return true
	}
	return false
}

// NewConnUDP creates connection over net.UDPConn.
func NewConnUDP(c *net.UDPConn, heartBeat time.Duration, multicastHopLimit int) *ConnUDP {
	var packetConn packetConn

	if isIPv6(c.LocalAddr().(*net.UDPAddr).IP) {
		packetConn = newPacketConnIPv6(ipv6.NewPacketConn(c))
	} else {
		packetConn = newPacketConnIPv4(ipv4.NewPacketConn(c))
	}

	connection := ConnUDP{connection: c, heartBeat: heartBeat, packetConn: packetConn, multicastHopLimit: multicastHopLimit}
	return &connection
}

// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
func (c *ConnUDP) LocalAddr() net.Addr {
	return c.connection.LocalAddr()
}

// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
func (c *ConnUDP) RemoteAddr() net.Addr {
	return c.connection.RemoteAddr()
}

// Close closes the connection.
func (c *ConnUDP) Close() error {
	return c.connection.Close()
}

func (c *ConnUDP) writeMulticastWithContext(ctx context.Context, udpCtx *ConnUDPContext, buffer []byte) error {
	if udpCtx == nil {
		return fmt.Errorf("cannot write multicast with context: invalid udpCtx")
	}
	if _, ok := c.packetConn.(*packetConnIPv4); ok && isIPv6(udpCtx.raddr.IP) {
		return fmt.Errorf("cannot write multicast with context: invalid destination address")
	}

	ifaces, err := net.Interfaces()
	if err != nil {
		return fmt.Errorf("cannot write multicast with context: cannot get interfaces for multicast connection: %v", err)
	}

	c.lock.Lock()
	defer c.lock.Unlock()
	for _, iface := range ifaces {
		written := 0
		for written < len(buffer) {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
			}

			if err := c.packetConn.SetMulticastInterface(&iface); err != nil {
				break
			}

			c.packetConn.SetMulticastHopLimit(c.multicastHopLimit)
			err := c.packetConn.SetWriteDeadline(time.Now().Add(c.heartBeat))
			if err != nil {
				return fmt.Errorf("cannot write multicast with context: cannot set write deadline for connection: %v", err)
			}
			n, err := c.packetConn.WriteTo(buffer, udpCtx.raddr)
			if err != nil {
				if isTemporary(err) {
					continue
				}
				break
			}
			written += n
		}
	}
	return nil
}

// WriteWithContext writes data with context.
func (c *ConnUDP) WriteWithContext(ctx context.Context, udpCtx *ConnUDPContext, buffer []byte) error {
	if udpCtx == nil {
		return fmt.Errorf("cannot write with context: invalid udpCtx")
	}
	if udpCtx.raddr.IP.IsMulticast() {
		return c.writeMulticastWithContext(ctx, udpCtx, buffer)
	}

	written := 0
	c.lock.Lock()
	defer c.lock.Unlock()
	for written < len(buffer) {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		err := c.connection.SetWriteDeadline(time.Now().Add(c.heartBeat))
		if err != nil {
			return fmt.Errorf("cannot set write deadline for udp connection: %v", err)
		}
		n, err := WriteToSessionUDP(c.connection, udpCtx, buffer[written:])
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return fmt.Errorf("cannot write to udp connection")
		}
		written += n
	}

	return nil
}

// ReadWithContext reads packet with context.
func (c *ConnUDP) ReadWithContext(ctx context.Context, buffer []byte) (int, *ConnUDPContext, error) {
	for {
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return -1, nil, fmt.Errorf("cannot read from udp connection: %v", ctx.Err())
			}
			return -1, nil, fmt.Errorf("cannot read from udp connection")
		default:
		}

		err := c.connection.SetReadDeadline(time.Now().Add(c.heartBeat))
		if err != nil {
			return -1, nil, fmt.Errorf("cannot set read deadline for udp connection: %v", err)
		}
		n, s, err := ReadFromSessionUDP(c.connection, buffer)
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return -1, nil, fmt.Errorf("cannot read from udp connection: %v", ctx.Err())
		}
		return n, s, err
	}
}

// SetMulticastLoopback sets whether transmitted multicast packets
// should be copied and send back to the originator.
func (c *ConnUDP) SetMulticastLoopback(on bool) error {
	return c.packetConn.SetMulticastLoopback(on)
}

// JoinGroup joins the group address group on the interface ifi.
// By default all sources that can cast data to group are accepted.
// It's possible to mute and unmute data transmission from a specific
// source by using ExcludeSourceSpecificGroup and
// IncludeSourceSpecificGroup.
// JoinGroup uses the system assigned multicast interface when ifi is
// nil, although this is not recommended because the assignment
// depends on platforms and sometimes it might require routing
// configuration.
func (c *ConnUDP) JoinGroup(ifi *net.Interface, group net.Addr) error {
	return c.packetConn.JoinGroup(ifi, group)
}

A net/connUDP_test.go => net/connUDP_test.go +184 -0
@@ 0,0 1,184 @@
package net

import (
	"context"
	"net"
	"strconv"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

func TestConnUDP_WriteWithContext(t *testing.T) {
	peerAddr := "127.0.0.1:2154"
	b, err := net.ResolveUDPAddr("udp", peerAddr)
	assert.NoError(t, err)

	ctxCanceled, ctxCancel := context.WithCancel(context.Background())
	ctxCancel()

	type args struct {
		ctx    context.Context
		udpCtx *ConnUDPContext
		buffer []byte
	}
	tests := []struct {
		name    string
		args    args
		wantErr bool
	}{
		{
			name: "valid",
			args: args{
				ctx:    context.Background(),
				udpCtx: NewConnUDPContext(b, nil),
				buffer: []byte("hello world"),
			},
		},
		{
			name: "cancelled",
			args: args{
				ctx:    ctxCanceled,
				buffer: []byte("hello world"),
			},
			wantErr: true,
		},
	}

	a, err := net.ResolveUDPAddr("udp", "127.0.0.1:")
	assert.NoError(t, err)
	l1, err := net.ListenUDP("udp", a)
	assert.NoError(t, err)
	err = SetUDPSocketOptions(l1)
	assert.NoError(t, err)
	c1 := NewConnUDP(l1, time.Millisecond*100, 0)
	defer c1.Close()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	l2, err := net.ListenUDP("udp", b)
	err = SetUDPSocketOptions(l2)
	assert.NoError(t, err)
	c2 := NewConnUDP(l2, time.Millisecond*100, 0)
	defer c2.Close()

	go func() {
		b := make([]byte, 1024)
		_, udpCtx, err := c2.ReadWithContext(ctx, b)
		if err != nil {
			return
		}
		correctSource(udpCtx.context)
	}()

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			err = c1.WriteWithContext(tt.args.ctx, tt.args.udpCtx, tt.args.buffer)

			c1.LocalAddr()
			c1.RemoteAddr()

			if tt.wantErr {
				assert.Error(t, err)
			} else {
				assert.NoError(t, err)
			}
		})
	}
}

func TestConnUDP_writeMulticastWithContext(t *testing.T) {
	peerAddr := "224.0.1.187:5683"
	b, err := net.ResolveUDPAddr("udp4", peerAddr)
	assert.NoError(t, err)

	ctxCanceled, ctxCancel := context.WithCancel(context.Background())
	ctxCancel()
	payload := []byte("hello world")

	type args struct {
		ctx    context.Context
		udpCtx *ConnUDPContext
		buffer []byte
	}
	tests := []struct {
		name    string
		args    args
		wantErr bool
	}{
		{
			name: "valid",
			args: args{
				ctx:    context.Background(),
				udpCtx: NewConnUDPContext(b, nil),
				buffer: payload,
			},
		},
		{
			name: "cancelled",
			args: args{
				ctx:    ctxCanceled,
				udpCtx: NewConnUDPContext(b, nil),
				buffer: payload,
			},
			wantErr: true,
		},
	}

	listenAddr := ":" + strconv.Itoa(b.Port)
	c, err := net.ResolveUDPAddr("udp4", listenAddr)
	assert.NoError(t, err)
	l2, err := net.ListenUDP("udp4", c)
	assert.NoError(t, err)
	err = SetUDPSocketOptions(l2)
	assert.NoError(t, err)
	c2 := NewConnUDP(l2, time.Millisecond*100, 2)
	defer c2.Close()
	err = c2.JoinGroup(nil, b)
	assert.NoError(t, err)
	err = c2.SetMulticastLoopback(true)
	assert.NoError(t, err)

	a, err := net.ResolveUDPAddr("udp4", "")
	assert.NoError(t, err)
	l1, err := net.ListenUDP("udp4", a)
	assert.NoError(t, err)
	err = SetUDPSocketOptions(l1)
	assert.NoError(t, err)
	c1 := NewConnUDP(l1, time.Millisecond*100, 2)
	defer c1.Close()
	assert.NoError(t, err)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		b := make([]byte, 1024)
		n, _, err := c2.ReadWithContext(ctx, b)
		assert.NoError(t, err)
		if n > 0 {
			b = b[:n]
			assert.Equal(t, payload, b)
		}
		wg.Done()
	}()
	defer wg.Wait()

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			err = c1.WriteWithContext(tt.args.ctx, tt.args.udpCtx, tt.args.buffer)

			c1.LocalAddr()
			c1.RemoteAddr()

			if tt.wantErr {
				assert.Error(t, err)
			} else {
				assert.NoError(t, err)
			}
		})
	}
}

A net/conn_test.go => net/conn_test.go +80 -0
@@ 0,0 1,80 @@
package net

import (
	"context"
	"net"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

func TestConn_WriteWithContext(t *testing.T) {
	ctxCanceled, ctxCancel := context.WithCancel(context.Background())
	ctxCancel()
	helloWorld := []byte("hello world")

	type args struct {
		ctx  context.Context
		data []byte
	}
	tests := []struct {
		name    string
		args    args
		wantErr bool
	}{
		{
			name: "valid",
			args: args{
				ctx:  context.Background(),
				data: helloWorld,
			},
		},
		{
			name: "cancelled",
			args: args{
				ctx:  ctxCanceled,
				data: helloWorld,
			},
			wantErr: true,
		},
	}

	listener, err := NewTCPListener("tcp", "127.0.0.1:", time.Millisecond*100)
	assert.NoError(t, err)
	defer listener.Close()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		for {
			conn, err := listener.AcceptWithContext(ctx)
			if err != nil {
				return
			}
			c := NewConn(conn, time.Millisecond*10)
			b := make([]byte, len(helloWorld))
			_ = c.ReadFullWithContext(ctx, b)
			c.Close()
		}
	}()

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tcpConn, err := net.Dial("tcp", listener.Addr().String())
			assert.NoError(t, err)
			c := NewConn(tcpConn, time.Millisecond*100)
			defer c.Close()

			c.LocalAddr()
			c.RemoteAddr()

			err = c.WriteWithContext(tt.args.ctx, tt.args.data)
			if tt.wantErr {
				assert.Error(t, err)
			} else {
				assert.NoError(t, err)
			}
		})
	}
}

A net/isTemporary.go => net/isTemporary.go +17 -0
@@ 0,0 1,17 @@
package net

import (
	"net"
	"strings"
)

func isTemporary(err error) bool {
	if netErr, ok := err.(net.Error); ok && (netErr.Temporary() || netErr.Timeout()) {
		return true
	}
	// https://github.com/golang/go/blob/958e212db799e609b2a8df51cdd85c9341e7a404/src/internal/poll/fd.go#L43
	if strings.Contains(err.Error(), "i/o timeout") {
		return true
	}
	return false
}

A net/tcplistener.go => net/tcplistener.go +83 -0
@@ 0,0 1,83 @@
package net

import (
	"context"
	"fmt"
	"net"
	"time"
)

// TCPListener is a TCP network listener that provides accept with context.
type TCPListener struct {
	listener  *net.TCPListener
	heartBeat time.Duration
}

func newNetTCPListen(network string, addr string) (*net.TCPListener, error) {
	a, err := net.ResolveTCPAddr(network, addr)
	if err != nil {
		return nil, fmt.Errorf("cannot create new net tcp listener: %v", err)
	}

	tcp, err := net.ListenTCP(network, a)
	if err != nil {
		return nil, fmt.Errorf("cannot create new net tcp listener: %v", err)
	}
	return tcp, nil
}

// NewTCPListener creates tcp listener.
// Known networks are "tcp", "tcp4" (IPv4-only), "tcp6" (IPv6-only).
func NewTCPListener(network string, addr string, heartBeat time.Duration) (*TCPListener, error) {
	tcp, err := newNetTCPListen(network, addr)
	if err != nil {
		return nil, fmt.Errorf("cannot create new tcp listener: %v", err)
	}
	return &TCPListener{listener: tcp, heartBeat: heartBeat}, nil
}

// AcceptContext waits with context for a generic Conn.
func (l *TCPListener) AcceptWithContext(ctx context.Context) (net.Conn, error) {
	for {
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return nil, fmt.Errorf("cannot accept connections: %v", ctx.Err())
			}
			return nil, nil
		default:
		}
		err := l.SetDeadline(time.Now().Add(l.heartBeat))
		if err != nil {
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		rw, err := l.listener.Accept()
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		return rw, nil
	}
}

// SetDeadline sets deadline for accept operation.
func (l *TCPListener) SetDeadline(t time.Time) error {
	return l.listener.SetDeadline(t)
}

// Accept waits for a generic Conn.
func (l *TCPListener) Accept() (net.Conn, error) {
	return l.AcceptWithContext(context.Background())
}

// Close closes the connection.
func (l *TCPListener) Close() error {
	return l.listener.Close()
}

// Addr represents a network end point address.
func (l *TCPListener) Addr() net.Addr {
	return l.listener.Addr()
}

A net/tlslistener.go => net/tlslistener.go +77 -0
@@ 0,0 1,77 @@
package net

import (
	"context"
	"crypto/tls"
	"fmt"
	"net"
	"time"
)

// TLSListener is a TLS listener that provides accept with context.
type TLSListener struct {
	tcp       *net.TCPListener
	listener  net.Listener
	heartBeat time.Duration
}

// NewTLSListener creates tcp listener.
// Known networks are "tcp", "tcp4" (IPv4-only), "tcp6" (IPv6-only).
func NewTLSListener(network string, addr string, cfg *tls.Config, heartBeat time.Duration) (*TLSListener, error) {
	tcp, err := newNetTCPListen(network, addr)
	if err != nil {
		return nil, fmt.Errorf("cannot create new tls listener: %v", err)
	}
	tls := tls.NewListener(tcp, cfg)
	return &TLSListener{
		tcp:       tcp,
		listener:  tls,
		heartBeat: heartBeat,
	}, nil
}

// AcceptContext waits with context for a generic Conn.
func (l *TLSListener) AcceptWithContext(ctx context.Context) (net.Conn, error) {
	for {
		select {
		case <-ctx.Done():
			if ctx.Err() != nil {
				return nil, fmt.Errorf("cannot accept connections: %v", ctx.Err())
			}
			return nil, nil
		default:
		}
		err := l.SetDeadline(time.Now().Add(l.heartBeat))
		if err != nil {
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		rw, err := l.listener.Accept()
		if err != nil {
			if isTemporary(err) {
				continue
			}
			return nil, fmt.Errorf("cannot accept connections: %v", err)
		}
		return rw, nil
	}
}

// SetDeadline sets deadline for accept operation.
func (l *TLSListener) SetDeadline(t time.Time) error {
	return l.tcp.SetDeadline(t)
}

// Accept waits for a generic Conn.
func (l *TLSListener) Accept() (net.Conn, error) {
	return l.AcceptWithContext(context.Background())
}

// Close closes the connection.
func (l *TLSListener) Close() error {
	return l.listener.Close()
}

// Addr represents a network end point address.
func (l *TLSListener) Addr() net.Addr {
	return l.listener.Addr()
}

A net/tlslistener_test.go => net/tlslistener_test.go +196 -0
@@ 0,0 1,196 @@
package net

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"os"
	"testing"
	"time"

	"github.com/stretchr/testify/require"

	"github.com/stretchr/testify/assert"
)

func SetTLSConfig(t *testing.T) *tls.Config {
	cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock)
	require.NoError(t, err)

	caRootPool := x509.NewCertPool()
	ok := caRootPool.AppendCertsFromPEM(CARootPemBlock)
	require.True(t, ok)
	caIntermediatesPool := x509.NewCertPool()
	ok = caIntermediatesPool.AppendCertsFromPEM(CAIntermediatePemBlock)
	require.True(t, ok)

	tlsConfig := tls.Config{
		Certificates: []tls.Certificate{cert},
		ClientAuth:   tls.RequireAnyClientCert,
		ClientCAs:    caRootPool,
		RootCAs:      caRootPool,

		GetConfigForClient: func(info *tls.ClientHelloInfo) (*tls.Config, error) {
			//https://github.com/golang/go/issues/29895
			m := tls.Config{
				Certificates: []tls.Certificate{cert},
				ClientAuth:   tls.RequireAnyClientCert,
			}
			m.VerifyPeerCertificate = func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
				return nil
			}
			return &m, nil
		},
	}

	return &tlsConfig
}

func TestTLSListener_AcceptWithContext(t *testing.T) {
	ctxCanceled, ctxCancel := context.WithCancel(context.Background())
	ctxCancel()

	type args struct {
		ctx context.Context
	}
	tests := []struct {
		name    string
		args    args
		wantErr bool
	}{
		{
			name: "valid",
			args: args{
				ctx: context.Background(),
			},
		},
		{
			name: "cancelled",
			args: args{
				ctx: ctxCanceled,
			},
			wantErr: true,
		},
	}

	dir, err := ioutil.TempDir("", "gotesttmp")
	assert.NoError(t, err)
	defer os.RemoveAll(dir)
	config := SetTLSConfig(t)

	listener, err := NewTLSListener("tcp", "127.0.0.1:", config, time.Millisecond*100)
	assert.NoError(t, err)
	defer listener.Close()

	go func() {
		for i := 0; i < len(tests); i++ {
			cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock)
			assert.NoError(t, err)

			c, err := tls.Dial("tcp", listener.Addr().String(), &tls.Config{
				InsecureSkipVerify: true,
				Certificates:       []tls.Certificate{cert},
			})
			assert.NoError(t, err)
			_, err = c.Write([]byte("hello"))
			assert.NoError(t, err)

			time.Sleep(time.Millisecond * 200)
			c.Close()
		}
	}()

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			con, err := listener.AcceptWithContext(tt.args.ctx)
			if tt.wantErr {
				assert.Error(t, err)
			} else {
				assert.NoError(t, err)
				b := make([]byte, 1024)
				_, err = con.Read(b)
				assert.NoError(t, err)
				err = con.Close()
				assert.NoError(t, err)
			}
		})
	}
}

var (
	CertPEMBlock = []byte(`-----BEGIN CERTIFICATE-----
MIIBkzCCATegAwIBAgIUF399tsbWkMnMF6NWt6j/MbUIZvUwDAYIKoZIzj0EAwIF
ADARMQ8wDQYDVQQDEwZSb290Q0EwHhcNMTgwNzAyMDUzODQwWhcNMjgwNzAyMDUz
ODQwWjA0MTIwMAYDVQQDEyl1dWlkOjYxNTVmMjFjLTA3MjItNDZjOC05ZDcxLTMw
NGE1NTMyNzllOTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABBTvmtgfe49ZY0L0
B7wC/XH5V1jJ3NFdLyPZZFmz9O731JB7dwGYVUtaRai5cPM349mIw9k5kX8Zww7E
wMf4jw2jSDBGMAkGA1UdEwQCMAAwDgYDVR0PAQH/BAQDAgGIMCkGA1UdJQQiMCAG
CCsGAQUFBwMBBggrBgEFBQcDAgYKKwYBBAGC3nwBBjAMBggqhkjOPQQDAgUAA0gA
MEUCIBPNUqmjeTFIMkT3Y1qqUnR/fQmqbhxR8gScBsz8m3w8AiEAlH3Nf57vFqqh
tuvff9aSBdNlDBlQ5dTLu24V7fScLLI=
-----END CERTIFICATE-----`)

	KeyPEMBlock = []byte(`-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIGqPsr+N0x/CBmykEGm04TXvsykwxwqAy32SpVO2ANB0oAoGCCqGSM49
AwEHoUQDQgAEFO+a2B97j1ljQvQHvAL9cflXWMnc0V0vI9lkWbP07vfUkHt3AZhV
S1pFqLlw8zfj2YjD2TmRfxnDDsTAx/iPDQ==
-----END EC PRIVATE KEY-----`)

	CARootPemBlock = []byte(`-----BEGIN CERTIFICATE-----
MIIBazCCAQ+gAwIBAgIUY9HA4Of2KwJm5HaP72+VkLpUCpYwDAYIKoZIzj0EAwIF
ADARMQ8wDQYDVQQDEwZSb290Q0EwHhcNMTgwNjIyMTEyMzM1WhcNMjgwNjIyMTEy
MzM1WjARMQ8wDQYDVQQDEwZSb290Q0EwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC
AAREWwFfs+rAjPZ80alM/dQEWFOILkpkkwadCGomdiEBwLdlJEKGHomcVNJ39xBV
nte6BA4fOP7a9kdrsbRe/qKao0MwQTAMBgNVHRMEBTADAQH/MA4GA1UdDwEB/wQE
AwIBBjAhBgNVHSUEGjAYBgorBgEEAYLefAEGBgorBgEEAYLefAEHMAwGCCqGSM49
BAMCBQADSAAwRQIgI95uRXx5y4iehqKq1CP99agqlPGc8JaMMIzvwn5lYBICIQC8
KokSEk+DVrYiWUubIxl/tSCtwC8jyA2jKO7CY63cQg==
-----END CERTIFICATE-----
`)

	CAIntermediatePemBlock = []byte(`-----BEGIN CERTIFICATE-----
MIIBdzCCARqgAwIBAgIUMFZsksJ1spFMlONPi+v0EkDcD+EwDAYIKoZIzj0EAwIF
ADARMQ8wDQYDVQQDEwZSb290Q0EwHhcNMTgwNjIyMTEyNDMwWhcNMjgwNjIyMTEy
NDMwWjAZMRcwFQYDVQQDEw5JbnRlcm1lZGlhdGVDQTBZMBMGByqGSM49AgEGCCqG
SM49AwEHA0IABBRR8WmmkmVWvFvdi1YyanKOV3FOiMwZ1blfAOnfUhWjBv2AVLJG
bRZ/fo+7BF8peD/BYQkbs1KAkH/nxnDeQLyjRjBEMA8GA1UdEwQIMAYBAf8CAQAw
DgYDVR0PAQH/BAQDAgEGMCEGA1UdJQQaMBgGCisGAQQBgt58AQYGCisGAQQBgt58
AQcwDAYIKoZIzj0EAwIFAANJADBGAiEA8VNPyaUzaIUOsqdvoaT3dCZDBbLjOx8R
XVqB37LdYPcCIQDiqvcbW0aOfVcvMDVs3r1HavgKuTIHgJ9uzSOAAF17vg==
-----END CERTIFICATE-----
`)

	// CertPEMBlock is a X509 data used to test TLS servers (used with tls.X509KeyPair)
	CertListenerPEMBlock = []byte(`-----BEGIN CERTIFICATE-----
MIICETCCAXqgAwIBAgIQGncx7Aoc6cmxB0O2AlDbIjANBgkqhkiG9w0BAQsFADAS
MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw
MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB
iQKBgQDNHitCs03rbqjQq77c6mlgNX68mew9Mn030JnHLhgWblGaMUsMqUPJn7Lx
i5BPnlc7rIEUHhhV38WmjSgQ7nvkZBM4A6lyyR3B3Vk+rQw6Xukj/ix+BXGoMZM9
sZFj4XZr+9n0ocXNSk3d+b43Ug42q5W17WYm10t2/ZYBkH9ISQIDAQABo2YwZDAO
BgNVHQ8BAf8EBAMCAqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUw
AwEB/zAsBgNVHREEJTAjgglsb2NhbGhvc3SHBH8AAAGHEAAAAAAAAAAAAAAAAAAA
AAEwDQYJKoZIhvcNAQELBQADgYEAdqjf/9CuyOjgdwMAb1k3lO9+lwWr6dq0zXwU
zq0Qj5spgLxeRK+SRwSswW2VbszkSr+Qd4OVDlX10KCzBZJ5qRZWcwM755UPxd+e
oO0RFbASO4yrMduKkXJo6tiMS/rjEC+9yUEEltlZduuQqIAdDjvgZfmhfMQpNuD/
X6zS+rU=
-----END CERTIFICATE-----`)

	// KeyPEMBlock is a X509 data used to test TLS servers (used with tls.X509KeyPair)
	KeyListenerPEMBlock = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDNHitCs03rbqjQq77c6mlgNX68mew9Mn030JnHLhgWblGaMUsM
qUPJn7Lxi5BPnlc7rIEUHhhV38WmjSgQ7nvkZBM4A6lyyR3B3Vk+rQw6Xukj/ix+
BXGoMZM9sZFj4XZr+9n0ocXNSk3d+b43Ug42q5W17WYm10t2/ZYBkH9ISQIDAQAB
AoGBAJXiEriFr013KjJ5HVnujJu522dTjnXVe/yaGJScUQurB0QF+xJAaYFeifLJ
CeW0DYhUcGnT5/JwNsySXxGoQqx8QCfStH8c6ZPkAF3qXYbPNsX4x2IpDJYyp7ve
Qj501VpeRPNd3mueBHvkZ0UPkBo6Tz7iA6ilp5qgF2soMUsBAkEA0Mwu4NSNRf7u
Gg42U9aFa0y9TZ5QuKLC42+SwzbtTyfMSj5G+m05aeuqinmWhNesaBss4BmmmSXg
J0N6kekUaQJBAPt9Bt1pJPKGv6IbC3SsccooRS9sQOUhOTRiVnwzZ1i4Dk23fRQN
Rox2AzYzsMPG6vGRwumQuBvj6RZy+BGWmOECQBF82HxKMR7osCaMhC5XbEtFXSGQ
HfCo6SvFX4RsKEoV6j1Zo/Y7ibB+ZYU9k8bCjZUWmZaXb2WqT3DabPyliekCQQDN
UUDGiO4KNurDLPNIWPU5h3Eci3Pb3Sj31IUpN0pbi0DaQECUm1YKnNp4aPEalQ8B
E/CegXFeC88jbc+LhHjhAkEAv6N2yaaKphaFOYLdcApVViIwKfdoZFKm+hEikhHg
zlI1KSI23j1bIvJXxH2sWMhbu534p3rE1MqC6v5dc/dGZA==
-----END RSA PRIVATE KEY-----`)
)

A net/udp.go => net/udp.go +87 -0
@@ 0,0 1,87 @@
package net

import (
	"encoding/base64"
	"net"

	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

// This is the required size of the OOB buffer to pass to ReadMsgUDP.
var udpOOBSize = func() int {
	// We can't know whether we'll get an IPv4 control message or an
	// IPv6 control message ahead of time. To get around this, we size
	// the buffer equal to the largest of the two.

	oob4 := ipv4.NewControlMessage(ipv4.FlagDst | ipv4.FlagInterface)
	oob6 := ipv6.NewControlMessage(ipv6.FlagDst | ipv6.FlagInterface)

	if len(oob4) > len(oob6) {
		return len(oob4)
	}

	return len(oob6)
}()

// ConnUDPContext holds the remote address and the associated
// out-of-band data.
type ConnUDPContext struct {
	raddr   *net.UDPAddr
	context []byte
}

// NewConnUDPContext creates conn udp context.
func NewConnUDPContext(raddr *net.UDPAddr, oob []byte) *ConnUDPContext {
	return &ConnUDPContext{
		raddr:   raddr,
		context: oob,
	}
}

// RemoteAddr returns the remote network address.
func (s *ConnUDPContext) RemoteAddr() net.Addr { return s.raddr }

// Key returns the key session for the map using
func (s *ConnUDPContext) Key() string {
	key := s.RemoteAddr().String() + "-" + base64.StdEncoding.EncodeToString(s.context)
	return key
}

// ReadFromSessionUDP acts just like net.UDPConn.ReadFrom(), but returns a session object instead of a
// net.UDPAddr.
func ReadFromSessionUDP(conn *net.UDPConn, b []byte) (int, *ConnUDPContext, error) {
	oob := make([]byte, udpOOBSize)
	n, oobn, _, raddr, err := conn.ReadMsgUDP(b, oob)
	if err != nil {
		return n, nil, err
	}
	return n, &ConnUDPContext{raddr, oob[:oobn]}, err
}

// WriteToSessionUDP acts just like net.UDPConn.WriteTo(), but uses a *SessionUDP instead of a net.Addr.
func WriteToSessionUDP(conn *net.UDPConn, session *ConnUDPContext, b []byte) (int, error) {
	//check if socket is connected via Dial
	if conn.RemoteAddr() == nil {
		return conn.WriteToUDP(b, session.raddr)
	}

	n, _, err := conn.WriteMsgUDP(b, correctSource(session.context), nil)
	return n, err
}

// parseDstFromOOB takes oob data and returns the destination IP.
func parseDstFromOOB(oob []byte) net.IP {
	// Start with IPv6 and then fallback to IPv4
	// TODO(fastest963): Figure out a way to prefer one or the other. Looking at
	// the lvl of the header for a 0 or 41 isn't cross-platform.
	cm6 := new(ipv6.ControlMessage)
	if cm6.Parse(oob) == nil && cm6.Dst != nil {
		return cm6.Dst
	}
	cm4 := new(ipv4.ControlMessage)
	if cm4.Parse(oob) == nil && cm4.Dst != nil {
		return cm4.Dst
	}
	return nil
}

A net/udp_unix.go => net/udp_unix.go +39 -0
@@ 0,0 1,39 @@
// +build !windows

package net

import (
	"net"

	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

// correctSource takes oob data and returns new oob data with the Src equal to the Dst
func correctSource(oob []byte) []byte {
	dst := parseDstFromOOB(oob)
	if dst == nil {
		return nil
	}
	// If the dst is definitely an IPv6, then use ipv6's ControlMessage to
	// respond otherwise use ipv4's because ipv6's marshal ignores ipv4
	// addresses.
	if dst.To4() == nil {
		cm := new(ipv6.ControlMessage)
		cm.Src = dst
		oob = cm.Marshal()
	} else {
		cm := new(ipv4.ControlMessage)
		cm.Src = dst
		oob = cm.Marshal()
	}
	return oob
}

// SetUDPSocketOptions set controls FlagDst,FlagInterface to UDPConn.
func SetUDPSocketOptions(conn *net.UDPConn) error {
	if ip4 := conn.LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
		return ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
	}
	return ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
}

A net/udp_windows.go => net/udp_windows.go +17 -0
@@ 0,0 1,17 @@
package net

import (
	"net"
)

// windows specific functions for udp

// SetUDPSocketOptions set controls FlagDst,FlagInterface to UDPConn - not supported by windows.
func SetUDPSocketOptions(conn *net.UDPConn) error {
	return nil
}

// correctSource takes oob data and returns new oob data with the Src equal to the Dst
func correctSource(oob []byte) []byte {
	return oob
}

M networksession.go => networksession.go +7 -7
@@ 10,7 10,7 @@ import (
	"sync/atomic"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	coapNet "github.com/go-ocf/go-coap/net"
)

// A networkSession interface is used by an COAP handler to


@@ 64,12 64,12 @@ type connUDP interface {
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
	ReadWithContext(ctx context.Context, buffer []byte) (int, *kitNet.ConnUDPContext, error)
	WriteWithContext(ctx context.Context, udpCtx *kitNet.ConnUDPContext, buffer []byte) error
	ReadWithContext(ctx context.Context, buffer []byte) (int, *coapNet.ConnUDPContext, error)
	WriteWithContext(ctx context.Context, udpCtx *coapNet.ConnUDPContext, buffer []byte) error
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {


@@ 98,7 98,7 @@ func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *kitNet.ConnU
}

// newSessionTCP create new session for TCP connection
func newSessionTCP(connection *kitNet.Conn, srv *Server) (networkSession, error) {
func newSessionTCP(connection *coapNet.Conn, srv *Server) (networkSession, error) {
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockWiseSzxBERT
	if srv.BlockWiseTransfer != nil {


@@ 144,14 144,14 @@ type sessionBase struct {
type sessionUDP struct {
	sessionBase
	connection     connUDP
	sessionUDPData *kitNet.ConnUDPContext                         // oob data to get egress interface right
	sessionUDPData *coapNet.ConnUDPContext                        // oob data to get egress interface right
	mapPairs       map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock   sync.Mutex                                     //to sync add remove token
}

type sessionTCP struct {
	sessionBase
	connection   *kitNet.Conn
	connection   *coapNet.Conn
	mapPairs     map[[MaxTokenSize]byte]*sessionResp //storage of channel Message
	mapPairsLock sync.Mutex                          //to sync add remove token


M server.go => server.go +20 -20
@@ 13,7 13,7 @@ import (
	"sync/atomic"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	coapNet "github.com/go-ocf/go-coap/net"
)

// Interval for stop worker if no load


@@ 142,9 142,9 @@ type Server struct {
	// Defines wake up interval from operations Read, Write over connection. defaults is 100ms.
	HeartBeat time.Duration
	// If newSessionUDPFunc is set it is called when session UDP want to be created
	newSessionUDPFunc func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error)
	newSessionUDPFunc func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error)
	// If newSessionUDPFunc is set it is called when session TCP want to be created
	newSessionTCPFunc func(connection *kitNet.Conn, srv *Server) (networkSession, error)
	newSessionTCPFunc func(connection *coapNet.Conn, srv *Server) (networkSession, error)
	// If NotifyNewSession is set it is called when new TCP/UDP session was created.
	NotifySessionNewFunc func(w *ClientConn)
	// If NotifyNewSession is set it is called when TCP/UDP session was ended.


@@ 222,7 222,7 @@ func (srv *Server) spawnWorker(w *Request) {
// ListenAndServe starts a coapserver on the configured address in *Server.
func (srv *Server) ListenAndServe() error {
	var listener Listener
	var connUDP *kitNet.ConnUDP
	var connUDP *coapNet.ConnUDP
	addr := srv.Addr
	var err error
	if addr == "" {


@@ 236,14 236,14 @@ func (srv *Server) ListenAndServe() error {

	switch srv.Net {
	case "tcp", "tcp4", "tcp6":
		listener, err = kitNet.NewTCPListener(srv.Net, addr, srv.heartBeat())
		listener, err = coapNet.NewTCPListener(srv.Net, addr, srv.heartBeat())
		if err != nil {
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		defer listener.Close()
	case "tcp-tls", "tcp4-tls", "tcp6-tls":
		network := strings.TrimSuffix(srv.Net, "-tls")
		listener, err = kitNet.NewTLSListener(network, addr, srv.TLSConfig, srv.heartBeat())
		listener, err = coapNet.NewTLSListener(network, addr, srv.TLSConfig, srv.heartBeat())
		if err != nil {
			return fmt.Errorf("cannot listen and serve: %v", err)
		}


@@ 257,10 257,10 @@ func (srv *Server) ListenAndServe() error {
		if err != nil {
			return err
		}
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
		if err := coapNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		connUDP = kitNet.NewConnUDP(l, srv.heartBeat(), 2)
		connUDP = coapNet.NewConnUDP(l, srv.heartBeat(), 2)
		defer connUDP.Close()
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		network := strings.TrimSuffix(srv.Net, "-mcast")


@@ 273,10 273,10 @@ func (srv *Server) ListenAndServe() error {
		if err != nil {
			return err
		}
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
		if err := coapNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		connUDP = kitNet.NewConnUDP(l, srv.heartBeat(), 2)
		connUDP = coapNet.NewConnUDP(l, srv.heartBeat(), 2)
		defer connUDP.Close()
		ifaces := srv.UDPMcastInterfaces
		if len(ifaces) == 0 {


@@ 300,11 300,11 @@ func (srv *Server) ListenAndServe() error {
	return srv.activateAndServe(listener, nil, connUDP)
}

func (srv *Server) initServeUDP(connUDP *kitNet.ConnUDP) error {
func (srv *Server) initServeUDP(connUDP *coapNet.ConnUDP) error {
	return srv.serveUDP(newShutdownWithContext(srv.doneChan), connUDP)
}

func (srv *Server) initServeTCP(conn *kitNet.Conn) error {
func (srv *Server) initServeTCP(conn *coapNet.Conn) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}


@@ 317,9 317,9 @@ func (srv *Server) ActivateAndServe() error {
	if srv.Conn != nil {
		switch c := srv.Conn.(type) {
		case *net.TCPConn, *tls.Conn:
			return srv.activateAndServe(nil, kitNet.NewConn(c, srv.heartBeat()), nil)
			return srv.activateAndServe(nil, coapNet.NewConn(c, srv.heartBeat()), nil)
		case *net.UDPConn:
			return srv.activateAndServe(nil, nil, kitNet.NewConnUDP(c, srv.heartBeat(), 2))
			return srv.activateAndServe(nil, nil, coapNet.NewConnUDP(c, srv.heartBeat(), 2))
		}
		return ErrInvalidServerConnParameter
	}


@@ 330,7 330,7 @@ func (srv *Server) ActivateAndServe() error {
	return ErrInvalidServerListenerParameter
}

func (srv *Server) activateAndServe(listener Listener, conn *kitNet.Conn, connUDP *kitNet.ConnUDP) error {
func (srv *Server) activateAndServe(listener Listener, conn *coapNet.Conn, connUDP *coapNet.ConnUDP) error {
	srv.doneLock.Lock()
	srv.done = false
	srv.doneChan = make(chan struct{})


@@ 346,7 346,7 @@ func (srv *Server) activateAndServe(listener Listener, conn *kitNet.Conn, connUD
	defer close(srv.queue)

	if srv.newSessionTCPFunc == nil {
		srv.newSessionTCPFunc = func(connection *kitNet.Conn, srv *Server) (networkSession, error) {
		srv.newSessionTCPFunc = func(connection *coapNet.Conn, srv *Server) (networkSession, error) {
			session, err := newSessionTCP(connection, srv)
			if err != nil {
				return nil, err


@@ 359,7 359,7 @@ func (srv *Server) activateAndServe(listener Listener, conn *kitNet.Conn, connUD
	}

	if srv.newSessionUDPFunc == nil {
		srv.newSessionUDPFunc = func(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
		srv.newSessionUDPFunc = func(connection *coapNet.ConnUDP, srv *Server, sessionUDPData *coapNet.ConnUDPContext) (networkSession, error) {
			session, err := newSessionUDP(connection, srv, sessionUDPData)
			if err != nil {
				return nil, err


@@ 428,7 428,7 @@ func (srv *Server) heartBeat() time.Duration {
	return time.Millisecond * 100
}

func (srv *Server) serveTCPconnection(ctx *shutdownContext, conn *kitNet.Conn) error {
func (srv *Server) serveTCPconnection(ctx *shutdownContext, conn *coapNet.Conn) error {
	session, err := srv.newSessionTCPFunc(conn, srv)
	if err != nil {
		return err


@@ 491,7 491,7 @@ func (srv *Server) serveTCP(l Listener) error {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveTCPconnection(ctx, kitNet.NewConn(rw, srv.heartBeat()))
				srv.serveTCPconnection(ctx, coapNet.NewConn(rw, srv.heartBeat()))
			}()
		}
	}


@@ 509,7 509,7 @@ func (srv *Server) closeSessions(err error) {
}

// serveUDP starts a UDP listener for the server.
func (srv *Server) serveUDP(ctx *shutdownContext, connUDP *kitNet.ConnUDP) error {
func (srv *Server) serveUDP(ctx *shutdownContext, connUDP *coapNet.ConnUDP) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

M server_test.go => server_test.go +4 -4
@@ 14,7 14,7 @@ import (
	"testing"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	coapNet "github.com/go-ocf/go-coap/net"
)

func CreateRespMessageByReq(isTCP bool, code COAPCode, req Message) Message {


@@ 77,7 77,7 @@ func RunLocalServerUDPWithHandlerIfaces(lnet, laddr string, BlockWiseTransfer bo
		return nil, "", nil, err
	}

	connUDP := kitNet.NewConnUDP(pc, time.Millisecond*100, 2)
	connUDP := coapNet.NewConnUDP(pc, time.Millisecond*100, 2)
	if strings.Contains(lnet, "-mcast") {
		if ifaces == nil {
			ifaces, err = net.Interfaces()


@@ 132,7 132,7 @@ func RunLocalUDPServer(net, laddr string, BlockWiseTransfer bool, BlockWiseTrans

func RunLocalServerTCPWithHandler(laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, handler HandlerFunc) (*Server, string, chan error, error) {
	network := "tcp"
	l, err := kitNet.NewTCPListener(network, laddr, time.Millisecond*100)
	l, err := coapNet.NewTCPListener(network, laddr, time.Millisecond*100)
	if err != nil {
		return nil, "", nil, fmt.Errorf("cannot create new tls listener: %v", err)
	}


@@ 170,7 170,7 @@ func RunLocalTCPServer(laddr string, BlockWiseTransfer bool, BlockWiseTransferSz
}

func RunLocalTLSServer(laddr string, config *tls.Config) (*Server, string, chan error, error) {
	l, err := kitNet.NewTLSListener("tcp", laddr, config, time.Millisecond*100)
	l, err := coapNet.NewTLSListener("tcp", laddr, config, time.Millisecond*100)
	if err != nil {
		return nil, "", nil, err
	}