~fnux/yggdrasil-go-coap

84523edf85d552c53ceb1495e002547090281873 — Jozef Kralik 2 years ago eef8a26
Fix windows multicast (#43)

* fix multicast write for windows

- multicast client now listen on free port to avoid share it among other
clients
- join group to all OS interfaces when provided interfaces are nil
- fix noresponse test
M Gopkg.lock => Gopkg.lock +4 -18
@@ 3,14 3,14 @@

[[projects]]
  branch = "master"
  digest = "1:25c92b5a7b89547608053ea7048469c173ede8d976cdfc861b8b39ca9852cad9"
  digest = "1:23c4482f7db54097030431eb88c87af95d0e072582cea3f6fdbc81ae22cd11f5"
  name = "github.com/go-ocf/kit"
  packages = [
    "log",
    "net",
  ]
  pruneopts = "UT"
  revision = "8560f72fc720c9710c19cb6814495ab67271da5c"
  revision = "6cb557ba9da5f1103d2807dfc6803d202952e8d7"

[[projects]]
  digest = "1:4887e9e89c80299aa520d718239809fdd2a47a9aa394909b169959bfbc424ddf"


@@ 29,17 29,6 @@
  version = "v1.3.2"

[[projects]]
  branch = "master"
  digest = "1:915aa54691f6897cc74afa0da29eaf6e93202831b4e980c56645d99c708bc5b1"
  name = "go.uber.org/goleak"
  packages = [
    ".",
    "internal/stack",
  ]
  pruneopts = "UT"
  revision = "c82e52b9ed06070186646c19bdceeae9dc18ec5d"

[[projects]]
  digest = "1:60bf2a5e347af463c42ed31a493d817f8a72f102543060ed992754e689805d1a"
  name = "go.uber.org/multierr"
  packages = ["."]


@@ 64,7 53,7 @@

[[projects]]
  branch = "master"
  digest = "1:425d81a8ef644b5cd324106e8d6e355581a47c2222b20b938822470a3c96a5e1"
  digest = "1:433ecd67c12cd2ebf2aaadfa9ff7ebadf400d3a31351078d902c12f1e0f6c2e5"
  name = "golang.org/x/net"
  packages = [
    "bpf",


@@ 74,7 63,7 @@
    "ipv6",
  ]
  pruneopts = "UT"
  revision = "16b79f2e4e95ea23b2bf9903c9809ff7b013ce85"
  revision = "74de082e2cca95839e88aa0aeee5aadf6ce7710f"

[solve-meta]
  analyzer-name = "dep"


@@ 82,9 71,6 @@
  input-imports = [
    "github.com/go-ocf/kit/net",
    "github.com/ugorji/go/codec",
    "go.uber.org/goleak",
    "golang.org/x/net/ipv4",
    "golang.org/x/net/ipv6",
  ]
  solver-name = "gps-cdcl"
  solver-version = 1

M Gopkg.toml => Gopkg.toml +0 -8
@@ 33,14 33,6 @@
  name = "github.com/ugorji/go"
  version = "1.1.2"

[[constraint]]
  branch = "master"
  name = "go.uber.org/goleak"

[[constraint]]
  branch = "master"
  name = "golang.org/x/net"

[prune]
  go-tests = true
  unused-packages = true

M blockwise_test.go => blockwise_test.go +0 -3
@@ 5,8 5,6 @@ import (
	"fmt"
	"log"
	"testing"

	"go.uber.org/goleak"
)

func testMarshal(t *testing.T, szx BlockWiseSzx, blockNumber uint, moreBlocksFollowing bool, expectedBlock uint32) {


@@ 85,7 83,6 @@ func TestBlockWiseBlockUnmarshal(t *testing.T) {
}

func TestServingUDPBlockWiseSzx16(t *testing.T) {
	defer goleak.VerifyNone(t)
	testServingTCPWithMsg(t, "udp", true, BlockWiseSzx16, make([]byte, 128), simpleMsg)
}


M client.go => client.go +22 -5
@@ 5,6 5,7 @@ package coap
import (
	"context"
	"crypto/tls"
	"fmt"
	"io"
	"net"
	"strings"


@@ 39,6 40,7 @@ type Client struct {
	BlockWiseTransferSzx *BlockWiseSzx // Set maximal block size of payload that will be send in fragment

	DisableTCPSignalMessages bool // Disable tcp signal messages
	MulticastHopLimit        int  //sets the hop limit field value for future outgoing multicast packets. default is 2.
}

func (c *Client) readTimeout() time.Duration {


@@ 86,6 88,9 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
	BlockWiseTransfer := false
	BlockWiseTransferSzx := BlockWiseSzx1024
	multicast := false
	if c.MulticastHopLimit == 0 {
		c.MulticastHopLimit = 2
	}

	switch c.Net {
	case "tcp-tls", "tcp4-tls", "tcp6-tls":


@@ 110,15 115,27 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
		if conn, err = dialer.DialContext(ctx, network, address); err != nil {
			return nil, err
		}
		sessionUPDData = kitNet.NewConnUDPWithContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		sessionUPDData = kitNet.NewConnUDPContext(conn.(*net.UDPConn).RemoteAddr().(*net.UDPAddr), nil)
		BlockWiseTransfer = true
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		var err error
		network = strings.TrimSuffix(c.Net, "-mcast")
		a, udpConn, err := listenUDP(network, address)
		multicastAddress, err := net.ResolveUDPAddr(network, address)
		if err != nil {
			return nil, err
			return nil, fmt.Errorf("cannot resolve multicast address: %v", err)
		}
		listenAddress, err := net.ResolveUDPAddr(network, "")
		if err != nil {
			return nil, fmt.Errorf("cannot resolve multicast listen address: %v", err)
		}
		udpConn, err := net.ListenUDP(network, listenAddress)
		if err != nil {
			return nil, fmt.Errorf("cannot listen address: %v", err)
		}
		if err = kitNet.SetUDPSocketOptions(udpConn); err != nil {
			return nil, fmt.Errorf("cannot set upd socket options: %v", err)
		}
		sessionUPDData = kitNet.NewConnUDPWithContext(a, nil)
		sessionUPDData = kitNet.NewConnUDPContext(multicastAddress, nil)
		conn = udpConn
		BlockWiseTransfer = true
		multicast = true


@@ 193,7 210,7 @@ func (c *Client) DialWithContext(ctx context.Context, address string) (clientCon
	case *net.UDPConn:
		// WriteContextMsgUDP returns error when addr is filled in SessionUDPData for connected socket
		kitNet.SetUDPSocketOptions(clientConn.srv.Conn.(*net.UDPConn))
		session, err := newSessionUDP(kitNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat()), clientConn.srv, sessionUPDData)
		session, err := newSessionUDP(kitNet.NewConnUDP(clientConn.srv.Conn.(*net.UDPConn), clientConn.srv.heartBeat(), c.MulticastHopLimit), clientConn.srv, sessionUPDData)
		if err != nil {
			return nil, err
		}

M client_test.go => client_test.go +0 -90
@@ 3,14 3,8 @@ package coap
import (
	"bytes"
	"log"
	"net"
	"strings"
	"testing"
	"time"

	kitNet "github.com/go-ocf/kit/net"
	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

func periodicTransmitter(w ResponseWriter, r *Request) {


@@ 105,90 99,6 @@ func TestServingTCPObservation(t *testing.T) {
	testServingObservation(t, "tcp", addrstr, false, BlockWiseSzx16)
}

func testServingMCastByClient(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, ifis []net.Interface) {
	payload := []byte("mcast payload")
	addrMcast := laddr
	ansArrived := make(chan bool)

	c := Client{
		Net: lnet,
		Handler: func(w ResponseWriter, r *Request) {
			if bytes.Equal(r.Msg.Payload(), payload) {
				log.Printf("mcast %v -> %v", r.Client.RemoteAddr(), r.Client.LocalAddr())
				ansArrived <- true
			} else {
				t.Fatalf("unknown payload %v arrived from %v", r.Msg.Payload(), r.Client.RemoteAddr())
			}
		},
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
	}
	var a *net.UDPAddr
	var err error
	if a, err = net.ResolveUDPAddr(strings.TrimSuffix(lnet, "-mcast"), addrMcast); err != nil {
		t.Fatalf("cannot resolve addr: %v", err)
	}
	co, err := c.Dial(addrMcast)
	if err != nil {
		t.Fatalf("cannot dial addr: %v", err)
	}

	if err := kitNet.JoinGroup(co.srv.Conn.(*net.UDPConn), nil, a); err != nil {
		t.Fatalf("cannot join self to multicast group: %v", err)
	}
	if ip4 := co.srv.Conn.(*net.UDPConn).LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
		if err := ipv4.NewPacketConn(co.srv.Conn.(*net.UDPConn)).SetMulticastLoopback(true); err != nil {
			t.Fatalf("cannot allow multicast loopback: %v", err)
		}
	} else {
		if err := ipv6.NewPacketConn(co.srv.Conn.(*net.UDPConn)).SetMulticastLoopback(true); err != nil {
			t.Fatalf("cannot allow multicast loopback: %v", err)
		}
	}
	if err != nil {
		t.Fatalf("unable to dialing: %v", err)
	}
	defer co.Close()

	req := &DgramMessage{
		MessageBase{
			typ:       NonConfirmable,
			code:      GET,
			messageID: 1234,
			payload:   payload,
		}}
	req.SetOption(ContentFormat, TextPlain)
	req.SetPathString("/test")

	co.WriteMsg(req)

	<-ansArrived
}

func TestServingIPv4MCastByClient(t *testing.T) {
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", false, BlockWiseSzx16, []net.Interface{})
}

func TestServingIPv6MCastByClient(t *testing.T) {
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockWiseSzx16, []net.Interface{})
}

func TestServingIPv4AllInterfacesMCastByClient(t *testing.T) {
	ifis, err := net.Interfaces()
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastByClient(t, "udp4-mcast", "225.0.1.187:11111", false, BlockWiseSzx16, ifis)
}

func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {
	ifis, err := net.Interfaces()
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastByClient(t, "udp6-mcast", "[ff03::158]:11111", false, BlockWiseSzx16, ifis)
}

func setupServer(t *testing.T) (*Server, string, chan error, error) {
	return RunLocalServerUDPWithHandler("udp", ":0", true, BlockWiseSzx1024, func(w ResponseWriter, r *Request) {
		msg := r.Client.NewMessage(MessageParams{

M multicastClient.go => multicastClient.go +2 -0
@@ 29,6 29,8 @@ type MulticastClient struct {
	BlockWiseTransfer    *bool         // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
	BlockWiseTransferSzx *BlockWiseSzx // Set maximal block size of payload that will be send in fragment

	MulticastHopLimit int //sets the hop limit field value for future outgoing multicast packets. default is 2.

	multicastHandler *TokenHandler
}


M multicastClient_test.go => multicastClient_test.go +21 -0
@@ 1,6 1,7 @@
package coap

import (
	"net"
	"testing"
)



@@ 31,3 32,23 @@ func TestServingIPv4MCastBlockWiseSzx512(t *testing.T) {
func TestServingIPv4MCastBlockWiseSzx1024(t *testing.T) {
	testServingMCast(t, "udp4-mcast", "225.0.1.187:11111", true, BlockWiseSzx1024, 1033)
}

func TestServingIPv6MCastByClient(t *testing.T) {
	testServingMCast(t, "udp6-mcast", "[ff03::158]:11111", false, BlockWiseSzx16, 1033)
}

func TestServingIPv4AllInterfacesMCastByClient(t *testing.T) {
	ifis, err := net.Interfaces()
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastWithIfaces(t, "udp4-mcast", "225.0.1.187:11111", false, BlockWiseSzx16, 1033, ifis)
}

func TestServingIPv6AllInterfacesMCastByClient(t *testing.T) {
	ifis, err := net.Interfaces()
	if err != nil {
		t.Fatalf("unable to get interfaces: %v", err)
	}
	testServingMCastWithIfaces(t, "udp6-mcast", "[ff03::158]:11111", false, BlockWiseSzx16, 1033, ifis)
}

M networksession.go => networksession.go +10 -2
@@ 60,8 60,16 @@ type networkSession interface {
	blockWiseIsValid(szx BlockWiseSzx) bool
}

type connUDP interface {
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
	ReadWithContext(ctx context.Context, buffer []byte) (int, *kitNet.ConnUDPContext, error)
	WriteWithContext(ctx context.Context, udpCtx *kitNet.ConnUDPContext, buffer []byte) error
}

// NewSessionUDP create new session for UDP connection
func newSessionUDP(connection *kitNet.ConnUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
func newSessionUDP(connection connUDP, srv *Server, sessionUDPData *kitNet.ConnUDPContext) (networkSession, error) {
	BlockWiseTransfer := true
	BlockWiseTransferSzx := BlockWiseSzx1024
	if srv.BlockWiseTransfer != nil {


@@ 135,7 143,7 @@ type sessionBase struct {

type sessionUDP struct {
	sessionBase
	connection     *kitNet.ConnUDP
	connection     connUDP
	sessionUDPData *kitNet.ConnUDPContext                         // oob data to get egress interface right
	mapPairs       map[[MaxTokenSize]byte]map[uint16]*sessionResp //storage of channel Message
	mapPairsLock   sync.Mutex                                     //to sync add remove token

M noresponsewriter_test.go => noresponsewriter_test.go +12 -5
@@ 2,6 2,7 @@ package coap

import (
	"reflect"
	"sync"
	"testing"
)



@@ 69,17 70,22 @@ func testNoResponseHandler(t *testing.T, w ResponseWriter, r *Request) {

func TestNoResponseBehaviour(t *testing.T) {
	// server creation
	s, addr, fin, err := RunLocalServerUDPWithHandler("udp", ":", false, BlockWiseSzx16, func(w ResponseWriter, r *Request) { testNoResponseHandler(t, w, r) })
	var wg sync.WaitGroup
	wg.Add(1)
	s, addr, fin, err := RunLocalServerUDPWithHandler("udp", ":", false, BlockWiseSzx16, func(w ResponseWriter, r *Request) {
		testNoResponseHandler(t, w, r)
		wg.Done()
	})
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}
	defer func() {
		s.Shutdown()
		<-fin
	}()
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)
	}

	// connect client
	c := Client{Net: "udp"}
	c := Client{Net: "udp", Handler: func(w ResponseWriter, r *Request) {}}
	con, err := c.Dial(addr)
	if err != nil {
		t.Fatalf("Unexpected error '%v'", err)


@@ 99,4 105,5 @@ func TestNoResponseBehaviour(t *testing.T) {
	if err != nil {
		t.Fatalf("client unable to write message: %v", err)
	}
	wg.Wait()
}

M server.go => server.go +52 -48
@@ 162,9 162,6 @@ type Server struct {
	queue chan *Request
	// Workers count
	workersCount int32
	// Shutdown handling
	//lock    sync.RWMutex
	//started bool

	sessionUDPMapLock sync.Mutex
	sessionUDPMap     map[string]networkSession


@@ 224,6 221,8 @@ func (srv *Server) spawnWorker(w *Request) {

// ListenAndServe starts a coapserver on the configured address in *Server.
func (srv *Server) ListenAndServe() error {
	var listener Listener
	var connUDP *kitNet.ConnUDP
	addr := srv.Addr
	var err error
	if addr == "" {


@@ 237,16 236,18 @@ func (srv *Server) ListenAndServe() error {

	switch srv.Net {
	case "tcp", "tcp4", "tcp6":
		srv.Listener, err = kitNet.NewTCPListener(srv.Net, addr, srv.heartBeat())
		listener, err = kitNet.NewTCPListener(srv.Net, addr, srv.heartBeat())
		if err != nil {
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		defer listener.Close()
	case "tcp-tls", "tcp4-tls", "tcp6-tls":
		network := strings.TrimSuffix(srv.Net, "-tls")
		srv.Listener, err = kitNet.NewTLSListener(network, addr, srv.TLSConfig, srv.heartBeat())
		listener, err = kitNet.NewTLSListener(network, addr, srv.TLSConfig, srv.heartBeat())
		if err != nil {
			return fmt.Errorf("cannot listen and serve: %v", err)
		}
		defer listener.Close()
	case "udp", "udp4", "udp6":
		a, err := net.ResolveUDPAddr(srv.Net, addr)
		if err != nil {


@@ 259,7 260,8 @@ func (srv *Server) ListenAndServe() error {
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		srv.Conn = l
		connUDP = kitNet.NewConnUDP(l, srv.heartBeat(), 2)
		defer connUDP.Close()
	case "udp-mcast", "udp4-mcast", "udp6-mcast":
		network := strings.TrimSuffix(srv.Net, "-mcast")



@@ 274,35 276,35 @@ func (srv *Server) ListenAndServe() error {
		if err := kitNet.SetUDPSocketOptions(l); err != nil {
			return err
		}
		if len(srv.UDPMcastInterfaces) > 0 {
			for _, ifi := range srv.UDPMcastInterfaces {
				if err := kitNet.JoinGroup(l, &ifi, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
					return err
				}
		connUDP = kitNet.NewConnUDP(l, srv.heartBeat(), 2)
		defer connUDP.Close()
		ifaces := srv.UDPMcastInterfaces
		if len(ifaces) == 0 {
			ifaces, err = net.Interfaces()
			if err != nil {
				return err
			}
		} else {
			if err := kitNet.JoinGroup(l, nil, &net.UDPAddr{IP: a.IP, Zone: a.Zone}); err != nil {
		}
		for _, iface := range ifaces {
			if err := connUDP.JoinGroup(&iface, a); err != nil {
				return err
			}
		}
		srv.Conn = l
		if err := connUDP.SetMulticastLoopback(true); err != nil {
			return err
		}
	default:
		return ErrInvalidNetParameter
	}
	if srv.Conn != nil {
		defer srv.Conn.Close()
	} else if srv.Listener != nil {
		defer srv.Listener.Close()
	}

	return srv.ActivateAndServe()
	return srv.activateAndServe(listener, nil, connUDP)
}

func (srv *Server) initServeUDP(conn *net.UDPConn) error {
	return srv.serveUDP(newShutdownWithContext(srv.doneChan), conn)
func (srv *Server) initServeUDP(connUDP *kitNet.ConnUDP) error {
	return srv.serveUDP(newShutdownWithContext(srv.doneChan), connUDP)
}

func (srv *Server) initServeTCP(conn net.Conn) error {
func (srv *Server) initServeTCP(conn *kitNet.Conn) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}


@@ 312,14 314,28 @@ func (srv *Server) initServeTCP(conn net.Conn) error {
// ActivateAndServe starts a coapserver with the PacketConn or Listener
// configured in *Server. Its main use is to start a server from systemd.
func (srv *Server) ActivateAndServe() error {
	if srv.Conn != nil {
		switch c := srv.Conn.(type) {
		case *net.TCPConn, *tls.Conn:
			return srv.activateAndServe(nil, kitNet.NewConn(c, srv.heartBeat()), nil)
		case *net.UDPConn:
			return srv.activateAndServe(nil, nil, kitNet.NewConnUDP(c, srv.heartBeat(), 2))
		}
		return ErrInvalidServerConnParameter
	}
	if srv.Listener != nil {
		return srv.activateAndServe(srv.Listener, nil, nil)
	}

	return ErrInvalidServerListenerParameter
}

func (srv *Server) activateAndServe(listener Listener, conn *kitNet.Conn, connUDP *kitNet.ConnUDP) error {
	srv.doneLock.Lock()
	srv.done = false
	srv.doneChan = make(chan struct{})
	srv.doneLock.Unlock()

	pConn := srv.Conn
	l := srv.Listener

	if srv.MaxMessageSize > 0 && srv.MaxMessageSize < uint32(szxToBytes[BlockWiseSzx16]) {
		return ErrInvalidMaxMesssageSizeParameter
	}


@@ 363,17 379,13 @@ func (srv *Server) ActivateAndServe() error {
		srv.NotifySessionEndFunc = func(w *ClientConn, err error) {}
	}

	if pConn != nil {
		switch pConn.(type) {
		case *net.TCPConn, *tls.Conn:
			return srv.initServeTCP(pConn)
		case *net.UDPConn:
			return srv.initServeUDP(pConn.(*net.UDPConn))
		}
		return ErrInvalidServerConnParameter
	}
	if l != nil {
		return srv.serveTCP(l)
	switch {
	case listener != nil:
		return srv.serveTCP(listener)
	case conn != nil:
		return srv.initServeTCP(conn)
	case connUDP != nil:
		return srv.initServeUDP(connUDP)
	}

	return ErrInvalidServerListenerParameter


@@ 416,9 428,7 @@ func (srv *Server) heartBeat() time.Duration {
	return time.Millisecond * 100
}

func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) error {
	conn := kitNet.NewConn(netConn, srv.heartBeat())

func (srv *Server) serveTCPconnection(ctx *shutdownContext, conn *kitNet.Conn) error {
	session, err := srv.newSessionTCPFunc(conn, srv)
	if err != nil {
		return err


@@ 441,12 451,10 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er
		}

		body := make([]byte, mti.BodyLen())
		//ctx, cancel := context.WithTimeout(srv.ctx, srv.readTimeout())
		err = conn.ReadFullWithContext(ctx, body)
		if err != nil {
			return session.closeWithError(fmt.Errorf("cannot serve tcp connection: %v", err))
		}
		//cancel()

		o, p, err := parseTcpOptionsPayload(mti, body)
		if err != nil {


@@ 454,7 462,6 @@ func (srv *Server) serveTCPconnection(ctx *shutdownContext, netConn net.Conn) er
		}

		msg := new(TcpMessage)
		//msg := TcpMessage{MessageBase{}}

		msg.fill(mti, o, p)



@@ 484,7 491,7 @@ func (srv *Server) serveTCP(l Listener) error {
			wg.Add(1)
			go func() {
				defer wg.Done()
				srv.serveTCPconnection(ctx, rw)
				srv.serveTCPconnection(ctx, kitNet.NewConn(rw, srv.heartBeat()))
			}()
		}
	}


@@ 502,14 509,11 @@ func (srv *Server) closeSessions(err error) {
}

// serveUDP starts a UDP listener for the server.
func (srv *Server) serveUDP(ctx *shutdownContext, conn *net.UDPConn) error {
	defer conn.Close()

func (srv *Server) serveUDP(ctx *shutdownContext, connUDP *kitNet.ConnUDP) error {
	if srv.NotifyStartedFunc != nil {
		srv.NotifyStartedFunc()
	}

	connUDP := kitNet.NewConnUDP(conn, srv.heartBeat())
	sessCtx, cancel := context.WithCancel(context.Background())
	defer cancel()


M server_test.go => server_test.go +45 -14
@@ 62,22 62,41 @@ func EchoServerBadID(w ResponseWriter, r *Request) {
}

func RunLocalServerUDPWithHandler(lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, handler HandlerFunc) (*Server, string, chan error, error) {
	return RunLocalServerUDPWithHandlerIfaces(lnet, laddr, BlockWiseTransfer, BlockWiseTransferSzx, handler, nil)
}

func RunLocalServerUDPWithHandlerIfaces(lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, handler HandlerFunc, ifaces []net.Interface) (*Server, string, chan error, error) {
	network := strings.TrimSuffix(lnet, "-mcast")

	a, err := net.ResolveUDPAddr(network, laddr)
	if err != nil {
		return nil, "", nil, err
	}
	var pc *net.UDPConn
	if strings.Contains(lnet, "-mcast") {
		pc, err = net.ListenMulticastUDP(network, nil, a)
	} else {
		pc, err = net.ListenUDP(network, a)
	}
	pc, err := net.ListenUDP(network, a)
	if err != nil {
		return nil, "", nil, err
	}
	server := &Server{Conn: pc, ReadTimeout: time.Hour, WriteTimeout: time.Hour,

	connUDP := kitNet.NewConnUDP(pc, time.Millisecond*100, 2)
	if strings.Contains(lnet, "-mcast") {
		if ifaces == nil {
			ifaces, err = net.Interfaces()
			if err != nil {
				return nil, "", nil, err
			}
		}
		for _, iface := range ifaces {
			if err := connUDP.JoinGroup(&iface, a); err != nil {
				return nil, "", nil, err
			}
		}

		if err := connUDP.SetMulticastLoopback(true); err != nil {
			return nil, "", nil, err
		}
	}

	server := &Server{ReadTimeout: time.Hour, WriteTimeout: time.Hour,
		NotifySessionNewFunc: func(s *ClientConn) {
			fmt.Printf("networkSession start %v\n", s.RemoteAddr())
		},


@@ 99,11 118,11 @@ func RunLocalServerUDPWithHandler(lnet, laddr string, BlockWiseTransfer bool, Bl
	fin := make(chan error, 1)

	go func() {
		fin <- server.ActivateAndServe()
		pc.Close()
		err = server.activateAndServe(nil, nil, connUDP)
		connUDP.Close()
		fin <- err
	}()

	waitLock.Lock()
	return server, pc.LocalAddr().String(), fin, nil
}



@@ 401,16 420,23 @@ func TestServingChallengingTimeoutClientTLS(t *testing.T) {
}

func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, payloadLen int) {
	testServingMCastWithIfaces(t, lnet, laddr, BlockWiseTransfer, BlockWiseTransferSzx, payloadLen, nil)
}

func testServingMCastWithIfaces(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, BlockWiseTransferSzx BlockWiseSzx, payloadLen int, ifaces []net.Interface) {
	addrMcast := laddr
	ansArrived := make(chan bool)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	responseServerConn := make([]*ClientConn, 0)
	var lockResponseServerConn sync.Mutex
	responseServer := Client{
		Net:                  strings.Trim(lnet, "-mcast"),
		Net:                  strings.TrimSuffix(lnet, "-mcast"),
		BlockWiseTransfer:    &BlockWiseTransfer,
		BlockWiseTransferSzx: &BlockWiseTransferSzx,
		Handler: func(w ResponseWriter, r *Request) {
			t.Log("responseServer.Handler")
			resp := w.NewResponse(Content)
			resp.SetPayload(make([]byte, payloadLen))
			resp.SetOption(ContentFormat, TextPlain)


@@ 421,7 447,8 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
		},
	}

	s, _, fin, err := RunLocalServerUDPWithHandler(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx, func(w ResponseWriter, r *Request) {
	s, _, fin, err := RunLocalServerUDPWithHandlerIfaces(lnet, addrMcast, BlockWiseTransfer, BlockWiseTransferSzx, func(w ResponseWriter, r *Request) {
		t.Log("RunLocalServerUDPWithHandler.Handler")
		resp := w.NewResponse(Content)
		resp.SetPayload(make([]byte, payloadLen))
		resp.SetOption(ContentFormat, TextPlain)


@@ 436,7 463,7 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
		lockResponseServerConn.Lock()
		responseServerConn = append(responseServerConn, conn)
		lockResponseServerConn.Unlock()
	})
	}, ifaces)
	if err != nil {
		t.Fatalf("unable to run test server: %v", err)
	}


@@ 470,7 497,11 @@ func testServingMCast(t *testing.T, lnet, laddr string, BlockWiseTransfer bool, 
	}
	defer rp.Cancel()

	<-ansArrived
	select {
	case <-ansArrived:
	case <-ctx.Done():
		t.Fatalf("timeout: %v", ctx.Err())
	}
}

func TestServingIPv4MCast(t *testing.T) {