~samwhited/xmpp

346d8604572608e13aaa96f0653560ad92f8354c — Sam Whited 1 year, 7 months ago bdb9a80 ibb
ibb: add new package

This patch adds an implementation of XEP-0047: In-Band Bytestreams which
can be used to create reliable, bidirectional, data streams over
existing XMPP networks.

Fixes #19

Signed-off-by: Sam Whited <sam@samwhited.com>
5 files changed, 710 insertions(+), 0 deletions(-)

A ibb/conn.go
A ibb/ibb.go
A ibb/ibb_test.go
A ibb/listener.go
A ibb/payloads.go
A ibb/conn.go => ibb/conn.go +248 -0
@@ 0,0 1,248 @@
// Copyright 2020 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package ibb

import (
	"bufio"
	"context"
	"encoding/base64"
	"io"
	"net"
	"sync"
	"time"

	"mellium.im/xmpp"
	"mellium.im/xmpp/jid"
	"mellium.im/xmpp/stanza"
)

type stanzaReadWriter struct {
	s      *xmpp.Session
	sid    string
	stanza string
	seqOut uint16
	to     jid.JID
}

func (w *stanzaReadWriter) Write(p []byte) (int, error) {
	data := dataPayload{
		Seq:  w.seqOut,
		SID:  w.sid,
		Data: p,
	}
	w.seqOut++

	var err error
	switch w.stanza {
	case messageType:
		err = w.s.Encode(context.TODO(), dataMessage{
			Message: stanza.Message{
				To: w.to,
			},
			Data: data,
		})
	case iqType:
		// TODO: wait for the reply and handle any errors appropriately (eg. retry
		// on wait).
		//
		// XEP-0047 §2.2:
		//     The sender of a data chunk need not wait for these acknowledgements
		//     before sending further stanzas. However, it is RECOMMENDED that the
		//     sender does wait in order to minimize the potential for rate-limiting
		//     penalties or throttling.
		err = w.s.Encode(context.TODO(), dataIQ{
			IQ: stanza.IQ{
				To: w.to,
			},
			Data: data,
		})
	}
	if err != nil {
		return 0, err
	}
	return len(p), nil
}

// Conn is an IBB stream.
// Writes to the stream may be buffered up to the block size and calling Close
// forces any remaining data to be flushed.
type Conn struct {
	closedM       sync.RWMutex
	closed        bool
	readBuf       *bufio.Reader
	readDeadline  time.Time
	remoteAddr    jid.JID
	seqIn         uint16
	sid           string
	stanza        string
	s             *xmpp.Session
	writeBuf      *bufio.Writer
	writeDeadline time.Time
	closeFunc     func() error
	handler       *Handler
	pw            *io.PipeWriter
}

func newConn(h *Handler, s *xmpp.Session, iq openIQ) *Conn {
	pr, pw := io.Pipe()
	rwc := &stanzaReadWriter{
		s:      s,
		sid:    iq.Open.SID,
		stanza: iq.Open.Stanza,
		to:     iq.IQ.To,
	}
	b64Reader := base64.NewDecoder(base64.StdEncoding, pr)
	b64Writer := base64.NewEncoder(base64.StdEncoding, rwc)
	r := bufio.NewReaderSize(b64Reader, int(iq.Open.BlockSize))
	w := bufio.NewWriterSize(b64Writer, int(iq.Open.BlockSize))

	return &Conn{
		readBuf:    r,
		remoteAddr: iq.IQ.To,
		sid:        iq.Open.SID,
		s:          s,
		stanza:     iq.Open.Stanza,
		writeBuf:   w,
		closeFunc:  b64Writer.Close,
		handler:    h,
		pw:         pw,
	}
}

// SID returns the unique session ID for the connection.
func (c *Conn) SID() string {
	return c.sid
}

// Stanza returns the carrier stanza type ("message" or "iq") for payloads
// received by the IBB session.
func (c *Conn) Stanza() string {
	return c.stanza
}

// Read reads data from the IBB stream.
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *Conn) Read(b []byte) (n int, err error) {
	c.closedM.RLock()
	defer c.closedM.RUnlock()
	if c.closed {
		return 0, io.EOF
	}
	return c.readBuf.Read(b)
}

// Write writes data to the IBB stream.
// Write can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
func (c *Conn) Write(b []byte) (n int, err error) {
	c.closedM.RLock()
	defer c.closedM.RUnlock()
	if c.closed {
		return 0, io.EOF
	}
	return c.writeBuf.Write(b)
}

// LocalAddr returns the local network address of the underlying XMPP session.
func (c *Conn) LocalAddr() net.Addr {
	return c.s.LocalAddr()
}

// RemoteAddr returns the remote network address of the IBB stream.
func (c *Conn) RemoteAddr() net.Addr {
	return c.remoteAddr
}

// Size returns the blocksize for the underlying buffer when writing to the IBB
// stream.
func (c *Conn) Size() int {
	return c.writeBuf.Size()
}

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
// If the write buffer contains data it will be written regardless of the
// blocksize.
func (c *Conn) Close() error {
	return c.closeWithLock(true)
}

func (c *Conn) closeWithLock(lock bool) error {
	c.closedM.Lock()
	defer c.closedM.Unlock()
	if c.closed {
		return nil
	}
	c.closed = true

	if lock {
		c.handler.mu.Lock()
		defer c.handler.mu.Unlock()
	}
	delete(c.handler.streams, c.sid)

	// Flush any remaining data to be written.
	err := c.writeBuf.Flush()
	if err != nil {
		return err
	}

	err = c.closeFunc()
	if err != nil {
		return err
	}

	// TODO: should we always at least try to send the close IQ even if something
	// else returns an error?
	respReadCloser, err := c.s.SendIQElement(context.TODO(), closePayload(c.sid), stanza.IQ{
		To:   c.remoteAddr,
		Type: stanza.SetIQ,
	})
	// TODO: how do we handle this error? Do we care if it errors?
	e := respReadCloser.Close()
	if err == nil {
		return e
	}
	return err
}

// SetDeadline sets the read and write deadlines associated with the connection.
// It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to Read or
// Write. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c *Conn) SetDeadline(t time.Time) error {
	c.readDeadline = t
	c.writeDeadline = t
	return nil
}

// SetReadDeadline sets the deadline for future Read calls and any
// currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *Conn) SetReadDeadline(t time.Time) error {
	c.readDeadline = t
	return nil
}

// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that some of the
// data was successfully written.
// A zero value for t means Write will not time out.
func (c *Conn) SetWriteDeadline(t time.Time) error {
	c.writeDeadline = t
	return nil
}

A ibb/ibb.go => ibb/ibb.go +260 -0
@@ 0,0 1,260 @@
// Copyright 2020 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

// Package ibb implements data transfer with XEP-0047: In-Band Bytestreams.
//
// In-band bytestreams (IBB) are a bidirectional data transfer mechanism that
// can be used to send small files or transfer other low-bandwidth data.
// Because IBB uses base64 encoding to send the binary data, it is extremely
// inefficient and should only be used as a fallback or last resort.
// When sending large amounts of data, a more efficient mechanism such as Jingle
// File Transfer (XEP-0234) or SOCKS5 Bytestreams (XEP-0065) should be used if
// possible.
package ibb // import "mellium.im/xmpp/ibb"

import (
	"context"
	"encoding/xml"
	"errors"
	"net"
	"sync"

	"mellium.im/xmlstream"
	"mellium.im/xmpp"
	"mellium.im/xmpp/internal/attr"
	"mellium.im/xmpp/jid"
	"mellium.im/xmpp/mux"
	"mellium.im/xmpp/stanza"
)

// NS is the XML namespace used by IBB. It is provided as a convenience.
const NS = `http://jabber.org/protocol/ibb`

// BlockSize is the default block size in bytes used if an IBB stream is opened
// with no block size set.
// Because IBB base64 encodes the underlying data, the actual data transfered
// per stanza will be roughly twice the blocksize.
const BlockSize = 4096

const (
	messageType = "message"
	iqType      = "iq"
)

// Handle returns an option that registers a Handler for IBB payloads.
func Handle(h *Handler) mux.Option {
	return func(m *mux.ServeMux) {
		mux.IQ(stanza.SetIQ, xml.Name{Local: "open", Space: NS}, h)(m)
		mux.IQ(stanza.SetIQ, xml.Name{Local: "close", Space: NS}, h)(m)
		mux.IQ(stanza.SetIQ, xml.Name{Local: "data", Space: NS}, h)(m)
		mux.Message("", xml.Name{Local: "data", Space: NS}, h)(m)
	}
}

// Handler is an xmpp.Handler that handles multiplexing of bidirectional IBB
// streams.
type Handler struct {
	mu       sync.Mutex
	streams  map[string]*Conn
	listener map[string]*listener
}

// Listen returns a listener that accepts IBB streams.
func (h *Handler) Listen(addr jid.JID) net.Listener {
	if h.listener == nil {
		h.listener = make(map[string]*listener)
	}
	// TODO: in "open" check if listener is nil (or listener conn is closed) and
	// reject the connection with an error if so.
	if listener, ok := h.listener[addr.Bare().String()]; ok {
		return listener
	}
	listener := &listener{
		conn: make(chan net.Conn),
		addr: addr.Bare(),
	}
	h.listener[addr.Bare().String()] = listener
	return listener
}

// HandleMessage implements mux.MessageHandler.
func (h *Handler) HandleMessage(msg stanza.Message, t xmlstream.TokenReadEncoder) error {
	tok, err := t.Token()
	if err != nil {
		return err
	}
	// TODO: do we need to check this? Iterate through until we find the right
	// payload? I forget how this works.
	start := tok.(xml.StartElement)

	_ = start
	panic("ibb: message data not yet implemented")
}

// HandleIQ implements mux.IQHandler.
func (h *Handler) HandleIQ(iq stanza.IQ, re xmlstream.TokenReadEncoder, start *xml.StartElement) error {
	switch start.Name.Local {
	case "open":
		listener, ok := h.listener[iq.To.Bare().String()]
		if !ok {
			// If we're not listening for connections at this address, return an
			// error.
			// XEP-0047 §2.1:
			//     If the responder supports IBB but does not wish to proceed with the
			//     session, it returns a <not-acceptable/> error.
			return sendError(iq, re, stanza.Error{
				Type:      stanza.Cancel,
				Condition: stanza.NotAcceptable,
			})
		}

		_, sid := attr.Get(start.Attr, "sid")
		// TODO: somehow we need to get the session on the handler, but I don't see
		// how that's possible in a sane way.
		conn, err := newConn(h, s, iq)
		if err != nil {
			return err
		}
		h.addStream(sid, conn)
		return conn, nil
		listener.conn <- conn
	case "close":
		// TODO: if we receive a close element, should we flush any outgoing writes
		// first and make sure the conn is closed?
		// TODO: also check if the stream existed or not and return an error if they
		// tried to close a stream we weren't handling.
		_, sid := attr.Get(start.Attr, "sid")
		return h.closeSID(iq, re, sid)
	case "data":
		d := xml.NewTokenDecoder(re)
		p := dataPayload{}
		err := d.DecodeElement(&p, start)
		if err != nil {
			return err
		}
		return h.handlePayload(iq, re, p)
	}

	// TODO: error handling:
	//   Stanza errors of type wait that might mean we can resume later
	//   Because the session ID is unknown, the recipient returns an <item-not-found/> error with a type of 'cancel'.
	//   Because the sequence number has already been used, the recipient returns an <unexpected-request/> error with a type of 'cancel'.
	//   Because the data is not formatted in accordance with Section 4 of RFC 4648, the recipient returns a <bad-request/> error with a type of 'cancel'.
	// TODO: count seq numbers and close if out of order

	panic("not yet implemented")
}

func sendError(stanzaStart interface{}, e xmlstream.TokenReadEncoder, errPayload stanza.Error) error {
	switch s := stanzaStart.(type) {
	case stanza.Message:
		s.To, s.From = s.From, s.To
		s.Type = stanza.ErrorMessage
		_, err := xmlstream.Copy(e, s.Wrap(errPayload.TokenReader()))
		return err
	case stanza.IQ:
		_, err := xmlstream.Copy(e, s.Error(errPayload))
		return err
	}
	return errors.New("ibb: unexpected stanza type")
}

func (h *Handler) closeSID(stanzaStart interface{}, e xmlstream.TokenReadEncoder, sid string) error {
	h.mu.Lock()
	defer h.mu.Unlock()

	conn, ok := h.streams[sid]
	if !ok {
		// XEP-0047 Example 10. Recipient does not know about the IBB session
		// https://xmpp.org/extensions/xep-0047.html#example-10
		return sendError(stanzaStart, e, stanza.Error{
			Type:      stanza.Cancel,
			Condition: stanza.ItemNotFound,
		})
	}
	return conn.closeWithLock(false)
}

func (h *Handler) handlePayload(stanzaStart interface{}, e xmlstream.TokenReadEncoder, p dataPayload) error {
	//Seq     uint16   `xml:"seq,attr"`
	//SID     string   `xml:"sid,attr"`
	//data    []byte   `xml:",chardata"`
	h.mu.Lock()
	defer h.mu.Unlock()

	conn, ok := h.streams[p.SID]
	if !ok {
		return sendError(stanzaStart, e, stanza.Error{
			Type:      stanza.Cancel,
			Condition: stanza.ItemNotFound,
		})
	}

	// TODO: the XEP suggests that we only do this if the sequence number has
	// already been used, and just close it if we get an unexpected sequence
	// number, but surely this should be an error too?
	if p.Seq != conn.seqIn {
		return sendError(stanzaStart, e, stanza.Error{
			Type:      stanza.Cancel,
			Condition: stanza.UnexpectedRequest,
		})
	}

	conn.seqIn++
	_, err := conn.pw.Write(p.Data)
	return err
}

// Open attempts to create a new IBB stream on the provided session using IQs as
// the carrier stanza.
func (h *Handler) Open(ctx context.Context, s *xmpp.Session, to jid.JID, blockSize uint16) (*Conn, error) {
	return h.open(ctx, iqType, s, to, blockSize)
}

// OpenMessage attempts to create a new IBB stream on the provided session using
// messages as the carrier stanza.
// Most users should call Open instead.
func (h *Handler) OpenMessage(ctx context.Context, s *xmpp.Session, to jid.JID, blockSize uint16) (*Conn, error) {
	return h.open(ctx, messageType, s, to, blockSize)
}

func (h *Handler) open(ctx context.Context, stanzaType string, s *xmpp.Session, to jid.JID, blockSize uint16) (*Conn, error) {
	sid := attr.RandomID()

	iq := openIQ{
		IQ: stanza.IQ{
			To: to,
		},
	}
	iq.Open.SID = sid
	iq.Open.Stanza = stanzaType
	iq.Open.BlockSize = blockSize

	resp, err := s.SendIQ(ctx, iq.TokenReader())
	if err != nil {
		return nil, err
	}
	// TODO: resp should never be nil, is this something about the test
	// ClientServer?
	if resp != nil {
		defer resp.Close()
	}

	conn, err := newConn(h, s, iq), nil
	if err != nil {
		return nil, err
	}
	h.addStream(sid, conn)
	return conn, nil
}

func (h *Handler) addStream(sid string, conn *Conn) {
	h.mu.Lock()
	defer h.mu.Unlock()

	if h.streams == nil {
		h.streams = make(map[string]*Conn)
	}
	h.streams[sid] = conn
}

A ibb/ibb_test.go => ibb/ibb_test.go +81 -0
@@ 0,0 1,81 @@
// Copyright 2020 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package ibb_test

import (
	"context"
	"io"
	"strconv"
	"strings"
	"testing"

	"mellium.im/xmpp/ibb"
	"mellium.im/xmpp/internal/xmpptest"
	"mellium.im/xmpp/mux"
)

var (
	_ mux.IQHandler      = (*ibb.Handler)(nil)
	_ mux.MessageHandler = (*ibb.Handler)(nil)
)

const sendData = "To sit in solemn silence on a dull dark dock"

var sendDataTestCases = [...]struct {
	BlockSize uint16
}{
	0: {},
}

func TestSendData(t *testing.T) {
	for i, tc := range sendDataTestCases {
		t.Run(strconv.Itoa(i), func(t *testing.T) {
			serverHandler := &ibb.Handler{}
			serverMux := mux.New(ibb.Handle(serverHandler))
			clientHandler := &ibb.Handler{}
			clientMux := mux.New(ibb.Handle(clientHandler))
			cs := xmpptest.NewClientServer(
				xmpptest.ServerHandler(serverMux),
				xmpptest.ClientHandler(clientMux),
			)

			errChan := make(chan error, 2)
			go func() {
				conn, err := clientHandler.Open(context.Background(), cs.Client, cs.Server.LocalAddr(), tc.BlockSize)
				if err != nil {
					errChan <- err
					return
				}
				_, err = io.WriteString(conn, sendData)
				if err != nil {
					errChan <- err
				}
			}()
			var buf strings.Builder
			go func() {
				listener := serverHandler.Listen(cs.Server.LocalAddr())
				conn, err := listener.Accept()
				if err != nil {
					errChan <- err
					return
				}
				_, err = io.Copy(&buf, conn)
				if err != nil {
					errChan <- err
				}
			}()

			for err := range errChan {
				if err != nil {
					t.Errorf("unexpected error on channel: %v", err)
				}
			}

			if s := buf.String(); s != sendData {
				t.Errorf("transmitted data was not correct: want=%q, got=%q", sendData, s)
			}
		})
	}
}

A ibb/listener.go => ibb/listener.go +38 -0
@@ 0,0 1,38 @@
// Copyright 2020 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package ibb

import (
	"errors"
	"net"

	"mellium.im/xmpp/jid"
)

var (
	ErrListenerClosed = errors.New("ibb: listener was closed")
)

type listener struct {
	conn chan net.Conn
	addr jid.JID
}

func (l *listener) Accept() (net.Conn, error) {
	conn, ok := <-l.conn
	if !ok {
		return nil, ErrListenerClosed
	}
	return conn, nil
}

func (l *listener) Close() error {
	close(l.conn)
	return nil
}

func (l *listener) Addr() net.Addr {
	return l.addr
}

A ibb/payloads.go => ibb/payloads.go +83 -0
@@ 0,0 1,83 @@
// Copyright 2020 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package ibb

import (
	"encoding/xml"
	"strconv"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/stanza"
)

func closePayload(sid string) xml.TokenReader {
	return xmlstream.Token(xml.StartElement{
		Name: xml.Name{Space: NS, Local: "close"},
		Attr: []xml.Attr{{
			Name:  xml.Name{Local: "sid"},
			Value: sid,
		}},
	})
}

type openIQ struct {
	stanza.IQ

	Open struct {
		BlockSize uint16 `xml:"block-size"`
		SID       string `xml:"sid"`
		Stanza    string `xml:"stanza,omitempty"`
	} `xml:"http://jabber.org/protocol/ibb open"`
}

// WriteXML satisfies the xmlstream.WriterTo interface.
// It is like MarshalXML except it writes tokens to w.
func (iq openIQ) WriteXML(w xmlstream.TokenWriter) (n int, err error) {
	return xmlstream.Copy(w, iq.TokenReader())
}

// TokenReader satisfies the xmlstream.Marshaler interface.
func (iq openIQ) TokenReader() xml.TokenReader {
	start := xml.StartElement{Name: xml.Name{Local: "open", Space: NS}}

	start.Attr = make([]xml.Attr, 0, 3)
	start.Attr = append(start.Attr, xml.Attr{
		Name:  xml.Name{Local: "block-size"},
		Value: strconv.FormatUint(uint64(iq.Open.BlockSize), 10),
	})
	start.Attr = append(start.Attr, xml.Attr{
		Name:  xml.Name{Local: "sid"},
		Value: iq.Open.SID,
	})
	if iq.Open.Stanza != "" {
		start.Attr = append(start.Attr, xml.Attr{
			Name:  xml.Name{Local: "stanza"},
			Value: iq.Open.Stanza,
		})
	}

	return iq.IQ.Wrap(
		xmlstream.Wrap(nil, start),
	)
}

type dataPayload struct {
	XMLName xml.Name `xml:"http://jabber.org/protocol/ibb data"`
	Seq     uint16   `xml:"seq,attr"`
	SID     string   `xml:"sid,attr"`
	Data    []byte   `xml:",chardata"`
}

type dataIQ struct {
	stanza.IQ

	Data dataPayload `xml:"http://jabber.org/protocol/ibb data"`
}

type dataMessage struct {
	stanza.Message

	Data dataPayload `xml:"http://jabber.org/protocol/ibb data"`
}