~samwhited/xmpp

f9f79195e91c2ab286c41dc1d994db5d664e83dc — Sam Whited 3 months ago 31cc8d6
xmpp: add Message and Presence send methods

Signed-off-by: Sam Whited <sam@samwhited.com>
M doc.go => doc.go +9 -3
@@ 91,11 91,17 @@
// The stanza package contains functions and structs that aid in the
// construction of message, presence and info/query (IQ) elements which have
// special semantics in XMPP and are known as "stanzas".
// There are 8 methods on Session used for transmitting stanzas and other events
// over the output stream.
// There are 16 methods on Session used for transmitting stanzas and other
// events over the output stream.
// Their names are matched by the regular expression:
//
//     (Send|Encode)(IQ)?(Element)?
//     (Send|Encode)(Message|Presence|IQ)?(Element)?
//
// There are also four methods specifically for sending IQs and handling their
// responses.
// Their names are matched by:
//
//     (Unmarshal|Iter)IQ(Element)?
//
// If "Send" is present it means that the method copies one XML token stream
// into the output stream, while "Encode" indicates that it takes a value and

M session.go => session.go +31 -226
@@ 536,23 536,15 @@ func handleInputStream(s *Session, handler Handler) (err error) {
		}
	}

	var id string
	var needsResp bool
	if isIQ(start.Name) {
		_, id = attr.Get(start.Attr, "id")

		// If this is a response IQ (ie. an "error" or "result") check if we're
		// handling it as part of a SendIQ call.
		// If not, record this so that we can check if the user sends a response
		// later.
		if !iqNeedsResp(start.Attr) {
			s.sentIQMutex.Lock()
			c := s.sentIQs[id]
			s.sentIQMutex.Unlock()
			if c == nil {
				goto noreply
			}
	iqOk := isIQ(start.Name)
	_, _, id, typ := getIDTyp(start.Attr)
	iqNeedsResp := typ == string(stanza.GetIQ) || typ == string(stanza.SetIQ)

	if !iqNeedsResp {
		s.sentIQMutex.Lock()
		c := s.sentIQs[id]
		s.sentIQMutex.Unlock()
		if c != nil {
			inner := xmlstream.Inner(r)
			c <- iqResponder{
				r: xmlstream.MultiReader(xmlstream.Token(start), inner, xmlstream.Token(start.End())),


@@ 566,11 558,8 @@ func handleInputStream(s *Session, handler Handler) (err error) {
			}
			return nil
		}
		needsResp = true
	}

noreply:

	w := s.TokenWriter()
	defer w.Close()
	rw := &responseChecker{


@@ 583,7 572,7 @@ noreply:
	}

	// If the user did not write a response to an IQ, send a default one.
	if needsResp && !rw.wroteResp {
	if iqOk && iqNeedsResp && !rw.wroteResp {
		_, toAttr := attr.Get(start.Attr, "to")
		var to jid.JID
		if toAttr != "" {


@@ 615,6 604,26 @@ noreply:
	return err
}

func getIDTyp(attrs []xml.Attr) (int, int, string, string) {
	var id, typ string
	idIdx := -1
	typIdx := -1
	for idx, attr := range attrs {
		switch attr.Name.Local {
		case "id":
			id = attr.Value
			idIdx = idx
		case "type":
			typ = attr.Value
			typIdx = idx
		}
		if idIdx > -1 && typIdx > -1 {
			break
		}
	}
	return idIdx, typIdx, id, typ
}

type responseChecker struct {
	xml.TokenReader
	xmlstream.TokenWriter


@@ 626,8 635,8 @@ type responseChecker struct {
func (rw *responseChecker) EncodeToken(t xml.Token) error {
	switch tok := t.(type) {
	case xml.StartElement:
		_, id := attr.Get(tok.Attr, "id")
		if rw.level < 1 && isIQEmptySpace(tok.Name) && id == rw.id && !iqNeedsResp(tok.Attr) {
		_, _, id, typ := getIDTyp(tok.Attr)
		if rw.level < 1 && isIQEmptySpace(tok.Name) && id == rw.id && (typ != string(stanza.GetIQ) && typ != string(stanza.SetIQ)) {
			rw.wroteResp = true
		}
		rw.level++


@@ 846,32 855,6 @@ func (s *Session) SetCloseDeadline(t time.Time) error {
	return s.Conn().SetReadDeadline(t)
}

// EncodeIQ is like Encode except that it returns an error if v does not marshal
// to an IQ stanza and like SendIQ it blocks until a response is received.
// For more information see SendIQ.
//
// EncodeIQ is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeIQ(ctx context.Context, v interface{}) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(v)
	if err != nil {
		return nil, err
	}
	return s.SendIQ(ctx, r)
}

// EncodeIQElement is like EncodeIQ except that it wraps the payload in an
// Info/Query (IQ) element.
// For more information see SendIQ.
//
// EncodeIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeIQElement(ctx context.Context, payload interface{}, iq stanza.IQ) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(payload)
	if err != nil {
		return nil, err
	}
	return s.SendIQElement(ctx, r, iq)
}

// Encode writes the XML encoding of v to the stream.
//
// For more information see "encoding/xml".Encode.


@@ 945,18 928,6 @@ func send(ctx context.Context, s *Session, r xml.TokenReader, start *xml.StartEl
	return s.out.e.Flush()
}

func iqNeedsResp(attrs []xml.Attr) bool {
	var typ string
	for _, attr := range attrs {
		if attr.Name.Local == "type" {
			typ = attr.Value
			break
		}
	}

	return typ == string(stanza.GetIQ) || typ == string(stanza.SetIQ)
}

func isIQ(name xml.Name) bool {
	return name.Local == "iq" && (name.Space == ns.Client || name.Space == ns.Server)
}


@@ 970,172 941,6 @@ func isStanzaEmptySpace(name xml.Name) bool {
		(name.Space == ns.Client || name.Space == ns.Server || name.Space == "")
}

// SendIQ is like Send except that it returns an error if the first token read
// from the stream is not an Info/Query (IQ) start element and blocks until a
// response is received.
//
// If the input stream is not being processed (a call to Serve is not running),
// SendIQ will never receive a response and will block until the provided
// context is canceled.
// If the response is non-nil, it does not need to be consumed in its entirety,
// but it must be closed before stream processing will resume.
// If the IQ type does not require a response—ie. it is a result or error IQ,
// meaning that it is a response itself—SendIQElemnt does not block and the
// response is nil.
//
// If the context is closed before the response is received, SendIQ immediately
// returns the context error.
// Any response received at a later time will not be associated with the
// original request but can still be handled by the Serve handler.
//
// If an error is returned, the response will be nil; the converse is not
// necessarily true.
// SendIQ is safe for concurrent use by multiple goroutines.
func (s *Session) SendIQ(ctx context.Context, r xml.TokenReader) (xmlstream.TokenReadCloser, error) {
	tok, err := r.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("expected IQ start element, got %T", tok)
	}
	if !isIQEmptySpace(start.Name) {
		return nil, fmt.Errorf("expected start element to be an IQ")
	}

	// If there's no ID, add one.
	idx, id := attr.Get(start.Attr, "id")
	if idx == -1 {
		idx = len(start.Attr)
		start.Attr = append(start.Attr, xml.Attr{Name: xml.Name{Local: "id"}, Value: ""})
	}
	if id == "" {
		id = attr.RandomID()
		start.Attr[idx].Value = id
	}

	// If this an IQ of type "set" or "get" we expect a response.
	if iqNeedsResp(start.Attr) {
		return s.sendResp(ctx, id, xmlstream.Inner(r), start)
	}

	// If this is an IQ of type result or error, we don't expect a response so
	// just send it normally.
	return nil, s.SendElement(ctx, xmlstream.Inner(r), start)
}

// SendIQElement is like SendIQ except that it wraps the payload in an
// Info/Query (IQ) element.
// For more information see SendIQ.
//
// SendIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) SendIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ) (xmlstream.TokenReadCloser, error) {
	return s.SendIQ(ctx, iq.Wrap(payload))
}

// UnmarshalIQ is like SendIQ except that error replies are unmarshaled into a
// stanza.Error and returned and otherwise the response payload is unmarshaled
// into v.
// For more information see SendIQ.
//
// UnmarshalIQ is safe for concurrent use by multiple goroutines.
func (s *Session) UnmarshalIQ(ctx context.Context, iq xml.TokenReader, v interface{}) error {
	return unmarshalIQ(ctx, iq, v, s)
}

// UnmarshalIQElement is like UnmarshalIQ but it wraps a payload in the provided IQ.
// For more information see SendIQ.
//
// UnmarshalIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) UnmarshalIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ, v interface{}) error {
	return unmarshalIQ(ctx, iq.Wrap(payload), v, s)
}

// IterIQ is like SendIQ except that error replies are unmarshaled into a
// stanza.Error and returned and otherwise an iterator over the children of the
// response payload is returned.
// For more information see SendIQ.
//
// IterIQ is safe for concurrent use by multiple goroutines.
func (s *Session) IterIQ(ctx context.Context, iq xml.TokenReader) (*xmlstream.Iter, error) {
	return iterIQ(ctx, iq, s)
}

// IterIQElement is like IterIQ but it wraps a payload in the provided IQ.
// For more information see SendIQ.
//
// IterIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) IterIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ) (*xmlstream.Iter, error) {
	return iterIQ(ctx, iq.Wrap(payload), s)
}

func iterIQ(ctx context.Context, iq xml.TokenReader, s *Session) (_ *xmlstream.Iter, e error) {
	resp, err := s.SendIQ(ctx, iq)
	if err != nil {
		return nil, err
	}
	defer func() {
		if e != nil {
			/* #nosec */
			resp.Close()
		}
	}()

	tok, err := resp.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("stanza: expected IQ start token, got %T %[1]v", tok)
	}
	_, err = stanza.UnmarshalIQError(resp, start)
	if err != nil {
		return nil, err
	}

	// Pop the payload start token, we want to iterate over its children.
	_, err = resp.Token()
	// Discard early EOF so that the iterator doesn't end up returning it.
	if err != nil && err != io.EOF {
		return nil, err
	}
	return xmlstream.NewIter(resp), nil
}

func unmarshalIQ(ctx context.Context, iq xml.TokenReader, v interface{}, s *Session) (e error) {
	resp, err := s.SendIQ(ctx, iq)
	if err != nil {
		return err
	}
	defer func() {
		ee := resp.Close()
		if e == nil {
			e = ee
		}
	}()

	tok, err := resp.Token()
	if err != nil {
		return err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return fmt.Errorf("stanza: expected IQ start token, got %T %[1]v", tok)
	}

	_, err = stanza.UnmarshalIQError(resp, start)
	if err != nil {
		return err
	}
	d := xml.NewTokenDecoder(resp)
	if v == nil {
		return nil
	}
	return d.Decode(v)
}

func (s *Session) sendResp(ctx context.Context, id string, payload xml.TokenReader, start xml.StartElement) (xmlstream.TokenReadCloser, error) {
	c := make(chan xmlstream.TokenReadCloser)


A session_iq.go => session_iq.go +209 -0
@@ 0,0 1,209 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package xmpp

import (
	"context"
	"encoding/xml"
	"fmt"
	"io"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/internal/attr"
	"mellium.im/xmpp/internal/marshal"
	"mellium.im/xmpp/stanza"
)

// SendIQ is like Send except that it returns an error if the first token read
// from the stream is not an Info/Query (IQ) start element and blocks until a
// response is received.
//
// If the input stream is not being processed (a call to Serve is not running),
// SendIQ will never receive a response and will block until the provided
// context is canceled.
// If the response is non-nil, it does not need to be consumed in its entirety,
// but it must be closed before stream processing will resume.
// If the IQ type does not require a response—ie. it is a result or error IQ,
// meaning that it is a response itself—SendIQElement does not block and the
// response is nil.
//
// If the context is closed before the response is received, SendIQ immediately
// returns the context error.
// Any response received at a later time will not be associated with the
// original request but can still be handled by the Serve handler.
//
// If an error is returned, the response will be nil; the converse is not
// necessarily true.
// SendIQ is safe for concurrent use by multiple goroutines.
func (s *Session) SendIQ(ctx context.Context, r xml.TokenReader) (xmlstream.TokenReadCloser, error) {
	tok, err := r.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("expected IQ start element, got %T", tok)
	}
	if !isIQEmptySpace(start.Name) {
		return nil, fmt.Errorf("expected start element to be an IQ")
	}

	// If there's no ID, add one.
	idx, _, id, typ := getIDTyp(start.Attr)
	if idx == -1 {
		idx = len(start.Attr)
		start.Attr = append(start.Attr, xml.Attr{Name: xml.Name{Local: "id"}, Value: ""})
	}
	if id == "" {
		id = attr.RandomID()
		start.Attr[idx].Value = id
	}

	// If this an IQ of type "set" or "get" we expect a response.
	if typ == string(stanza.GetIQ) || typ == string(stanza.SetIQ) {
		return s.sendResp(ctx, id, xmlstream.Inner(r), start)
	}

	// If this is an IQ of type result or error, we don't expect a response so
	// just send it normally.
	return nil, s.SendElement(ctx, xmlstream.Inner(r), start)
}

// SendIQElement is like SendIQ except that it wraps the payload in an
// Info/Query (IQ) element.
// For more information see SendIQ.
//
// SendIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) SendIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ) (xmlstream.TokenReadCloser, error) {
	return s.SendIQ(ctx, iq.Wrap(payload))
}

// EncodeIQ is like Encode except that it returns an error if v does not marshal
// to an IQ stanza and like SendIQ it blocks until a response is received.
// For more information see SendIQ.
//
// EncodeIQ is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeIQ(ctx context.Context, v interface{}) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(v)
	if err != nil {
		return nil, err
	}
	return s.SendIQ(ctx, r)
}

// EncodeIQElement is like EncodeIQ except that it wraps the payload in an
// Info/Query (IQ) element.
// For more information see SendIQ.
//
// EncodeIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeIQElement(ctx context.Context, payload interface{}, iq stanza.IQ) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(payload)
	if err != nil {
		return nil, err
	}
	return s.SendIQElement(ctx, r, iq)
}

// UnmarshalIQ is like SendIQ except that error replies are unmarshaled into a
// stanza.Error and returned and otherwise the response payload is unmarshaled
// into v.
// For more information see SendIQ.
//
// UnmarshalIQ is safe for concurrent use by multiple goroutines.
func (s *Session) UnmarshalIQ(ctx context.Context, iq xml.TokenReader, v interface{}) error {
	return unmarshalIQ(ctx, iq, v, s)
}

// UnmarshalIQElement is like UnmarshalIQ but it wraps a payload in the provided IQ.
// For more information see SendIQ.
//
// UnmarshalIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) UnmarshalIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ, v interface{}) error {
	return unmarshalIQ(ctx, iq.Wrap(payload), v, s)
}

// IterIQ is like SendIQ except that error replies are unmarshaled into a
// stanza.Error and returned and otherwise an iterator over the children of the
// response payload is returned.
// For more information see SendIQ.
//
// IterIQ is safe for concurrent use by multiple goroutines.
func (s *Session) IterIQ(ctx context.Context, iq xml.TokenReader) (*xmlstream.Iter, error) {
	return iterIQ(ctx, iq, s)
}

// IterIQElement is like IterIQ but it wraps a payload in the provided IQ.
// For more information see SendIQ.
//
// IterIQElement is safe for concurrent use by multiple goroutines.
func (s *Session) IterIQElement(ctx context.Context, payload xml.TokenReader, iq stanza.IQ) (*xmlstream.Iter, error) {
	return iterIQ(ctx, iq.Wrap(payload), s)
}

func iterIQ(ctx context.Context, iq xml.TokenReader, s *Session) (_ *xmlstream.Iter, e error) {
	resp, err := s.SendIQ(ctx, iq)
	if err != nil {
		return nil, err
	}
	defer func() {
		if e != nil {
			/* #nosec */
			resp.Close()
		}
	}()

	tok, err := resp.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("stanza: expected IQ start token, got %T %[1]v", tok)
	}
	_, err = stanza.UnmarshalIQError(resp, start)
	if err != nil {
		return nil, err
	}

	// Pop the payload start token, we want to iterate over its children.
	_, err = resp.Token()
	// Discard early EOF so that the iterator doesn't end up returning it.
	if err != nil && err != io.EOF {
		return nil, err
	}
	return xmlstream.NewIter(resp), nil
}

func unmarshalIQ(ctx context.Context, iq xml.TokenReader, v interface{}, s *Session) (e error) {
	resp, err := s.SendIQ(ctx, iq)
	if err != nil {
		return err
	}
	defer func() {
		ee := resp.Close()
		if e == nil {
			e = ee
		}
	}()

	tok, err := resp.Token()
	if err != nil {
		return err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return fmt.Errorf("stanza: expected IQ start token, got %T %[1]v", tok)
	}

	_, err = stanza.UnmarshalIQError(resp, start)
	if err != nil {
		return err
	}
	d := xml.NewTokenDecoder(resp)
	if v == nil {
		return nil
	}
	return d.Decode(v)
}

A session_message.go => session_message.go +102 -0
@@ 0,0 1,102 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package xmpp

import (
	"context"
	"encoding/xml"
	"fmt"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/internal/attr"
	"mellium.im/xmpp/internal/marshal"
	"mellium.im/xmpp/internal/ns"
	"mellium.im/xmpp/stanza"
)

func isMessageEmptySpace(name xml.Name) bool {
	return name.Local == "message" && (name.Space == "" || name.Space == ns.Client || name.Space == ns.Server)
}

// SendMessage is like Send except that it returns an error if the first token
// read from the input is not a message start token and blocks until an error
// response is received or the context times out.
// Messages are generally fire-and-forget meaning that the success behavior of
// SendMessage is to time out and that methods such as Send should normally be
// used instead.
// It is thus mainly for use by extensions that define an extension-namespaced
// success response to a message being sent and need a mechanism to track
// general error responses without handling every single message sent through
// the session to see if it has a matching ID.
//
// SendMessage is safe for concurrent use by multiple goroutines.
func (s *Session) SendMessage(ctx context.Context, r xml.TokenReader) (xmlstream.TokenReadCloser, error) {
	tok, err := r.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("expected IQ start element, got %T", tok)
	}
	if !isMessageEmptySpace(start.Name) {
		return nil, fmt.Errorf("expected start element to be a message")
	}

	// If there's no ID, add one.
	idx, _, id, typ := getIDTyp(start.Attr)
	if idx == -1 {
		idx = len(start.Attr)
		start.Attr = append(start.Attr, xml.Attr{Name: xml.Name{Local: "id"}, Value: ""})
	}
	if id == "" {
		id = attr.RandomID()
		start.Attr[idx].Value = id
	}

	// If this is an error message we don't ever expect a response, so just send
	// it normally.
	if typ == string(stanza.ErrorMessage) {
		return nil, s.SendElement(ctx, xmlstream.Inner(r), start)
	}

	return s.sendResp(ctx, id, xmlstream.Inner(r), start)
}

// SendMessageElement is like SendMessage except that it wraps the payload in a
// Message element.
// For more information see SendMessage.
//
// SendMessageElement is safe for concurrent use by multiple goroutines.
func (s *Session) SendMessageElement(ctx context.Context, payload xml.TokenReader, msg stanza.Message) (xmlstream.TokenReadCloser, error) {
	return s.SendMessage(ctx, msg.Wrap(payload))
}

// EncodeMessage is like Encode except that it returns an error if v does not
// marshal to an Message stanza and like SendMessage it blocks until an error
// response is received or the context times out.
// For more information see SendMessage.
//
// EncodeMessage is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeMessage(ctx context.Context, v interface{}) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(v)
	if err != nil {
		return nil, err
	}
	return s.SendMessage(ctx, r)
}

// EncodeMessageElement is like EncodeMessage except that it wraps the payload
// in a Message element.
// For more information see SendMessage.
//
// EncodeMessageElement is safe for concurrent use by multiple goroutines.
func (s *Session) EncodeMessageElement(ctx context.Context, payload interface{}, msg stanza.Message) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(payload)
	if err != nil {
		return nil, err
	}
	return s.SendMessageElement(ctx, r, msg)
}

A session_message_test.go => session_message_test.go +133 -0
@@ 0,0 1,133 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package xmpp_test

import (
	"context"
	"encoding/xml"
	"strconv"
	"testing"
	"time"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/internal/xmpptest"
	"mellium.im/xmpp/stanza"
)

var sendMessageTests = [...]struct {
	msg        stanza.Message
	canceled   bool
	payload    xml.TokenReader
	err        error
	writesBody bool
	resp       *stanza.Message
}{
	0: {
		msg:        stanza.Message{ID: testIQID, Type: stanza.NormalMessage},
		writesBody: true,
		resp:       &stanza.Message{ID: testIQID, Type: stanza.ErrorMessage},
	},
	1: {
		msg:        stanza.Message{ID: testIQID, Type: stanza.GroupChatMessage},
		writesBody: true,
		resp:       &stanza.Message{ID: testIQID, Type: stanza.ErrorMessage},
	},
	2: {
		msg:        stanza.Message{Type: stanza.ErrorMessage, ID: testIQID},
		writesBody: true,
	},
	3: {
		msg:        stanza.Message{ID: testIQID, Type: stanza.GroupChatMessage},
		canceled:   true,
		writesBody: true,
		err:        context.Canceled,
	},
}

func TestSendMessage(t *testing.T) {
	for i, tc := range sendMessageTests {
		tc := tc
		t.Run(strconv.Itoa(i), func(t *testing.T) {
			t.Run("SendMessageElement", func(t *testing.T) {
				s := xmpptest.NewClientServer(xmpptest.ServerHandlerFunc(func(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
					if tc.resp != nil {
						_, err := xmlstream.Copy(t, tc.resp.Wrap(nil))
						return err
					}
					return nil
				}))

				ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
				if tc.canceled {
					cancel()
				} else {
					defer cancel()
				}

				resp, err := s.Client.SendMessageElement(ctx, tc.payload, tc.msg)
				if err != tc.err {
					t.Errorf("unexpected error, want=%q, got=%q", tc.err, err)
				}
				respMsg := stanza.Message{}
				if resp != nil {
					defer func() {
						if err := resp.Close(); err != nil {
							t.Errorf("Error closing response: %q", err)
						}
					}()
					err = xml.NewTokenDecoder(resp).Decode(&respMsg)
					if err != nil {
						t.Errorf("error decoding response: %v", err)
					}
				}
				switch {
				case resp == nil && tc.resp != nil:
					t.Errorf("expected response, but got none")
				case resp != nil && tc.resp == nil:
					t.Errorf("did not expect response, but got: %+v", respMsg)
				}
			})
			t.Run("SendMessage", func(t *testing.T) {
				s := xmpptest.NewClientServer(xmpptest.ServerHandlerFunc(func(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
					if tc.resp != nil {
						_, err := xmlstream.Copy(t, tc.resp.Wrap(nil))
						return err
					}
					return nil
				}))

				ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
				if tc.canceled {
					cancel()
				} else {
					defer cancel()
				}

				resp, err := s.Client.SendMessage(ctx, tc.msg.Wrap(tc.payload))
				if err != tc.err {
					t.Errorf("unexpected error, want=%q, got=%q", tc.err, err)
				}
				respMsg := stanza.Message{}
				if resp != nil {
					defer func() {
						if err := resp.Close(); err != nil {
							t.Errorf("error closing response: %q", err)
						}
					}()
					err = xml.NewTokenDecoder(resp).Decode(&respMsg)
					if err != nil {
						t.Errorf("error decoding response: %v", err)
					}
				}
				switch {
				case resp == nil && tc.resp != nil:
					t.Errorf("expected response, but got none")
				case resp != nil && tc.resp == nil:
					t.Errorf("did not expect response, but got: %+v", respMsg)
				}
			})
		})
	}
}

A session_presence.go => session_presence.go +102 -0
@@ 0,0 1,102 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package xmpp

import (
	"context"
	"encoding/xml"
	"fmt"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/internal/attr"
	"mellium.im/xmpp/internal/marshal"
	"mellium.im/xmpp/internal/ns"
	"mellium.im/xmpp/stanza"
)

func isPresenceEmptySpace(name xml.Name) bool {
	return name.Local == "presence" && (name.Space == "" || name.Space == ns.Client || name.Space == ns.Server)
}

// SendPresence is like Send except that it returns an error if the first token
// read from the input is not a presence start token and blocks until an error
// response is received or the context times out.
// Presences are generally fire-and-forget meaning that the success behavior of
// SendPresence is to time out and that methods such as Send should normally be
// used instead.
// It is thus mainly for use by extensions that define an extension-namespaced
// success response to a presence being sent and need a mechanism to track
// general error responses without handling every single presence sent through
// the session to see if it has a matching ID.
//
// SendPresence is safe for concurrent use by multiple goroutines.
func (s *Session) SendPresence(ctx context.Context, r xml.TokenReader) (xmlstream.TokenReadCloser, error) {
	tok, err := r.Token()
	if err != nil {
		return nil, err
	}
	start, ok := tok.(xml.StartElement)
	if !ok {
		return nil, fmt.Errorf("expected IQ start element, got %T", tok)
	}
	if !isPresenceEmptySpace(start.Name) {
		return nil, fmt.Errorf("expected start element to be a presence")
	}

	// If there's no ID, add one.
	idx, _, id, typ := getIDTyp(start.Attr)
	if idx == -1 {
		idx = len(start.Attr)
		start.Attr = append(start.Attr, xml.Attr{Name: xml.Name{Local: "id"}, Value: ""})
	}
	if id == "" {
		id = attr.RandomID()
		start.Attr[idx].Value = id
	}

	// If this is an error presence we don't ever expect a response, so just send
	// it normally.
	if typ == string(stanza.ErrorPresence) {
		return nil, s.SendElement(ctx, xmlstream.Inner(r), start)
	}

	return s.sendResp(ctx, id, xmlstream.Inner(r), start)
}

// SendPresenceElement is like SendPresence except that it wraps the payload in
// a Presence element.
// For more information see SendPresence.
//
// SendPresenceElement is safe for concurrent use by multiple goroutines.
func (s *Session) SendPresenceElement(ctx context.Context, payload xml.TokenReader, msg stanza.Presence) (xmlstream.TokenReadCloser, error) {
	return s.SendPresence(ctx, msg.Wrap(payload))
}

// EncodePresence is like Encode except that it returns an error if v does not
// marshal to an Presence stanza and like SendPresence it blocks until an error
// response is received or the context times out.
// For more information see SendPresence.
//
// EncodePresence is safe for concurrent use by multiple goroutines.
func (s *Session) EncodePresence(ctx context.Context, v interface{}) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(v)
	if err != nil {
		return nil, err
	}
	return s.SendPresence(ctx, r)
}

// EncodePresenceElement is like EncodePresence except that it wraps the payload
// in a Presence element.
// For more information see SendPresence.
//
// EncodePresenceElement is safe for concurrent use by multiple goroutines.
func (s *Session) EncodePresenceElement(ctx context.Context, payload interface{}, msg stanza.Presence) (xmlstream.TokenReadCloser, error) {
	r, err := marshal.TokenReader(payload)
	if err != nil {
		return nil, err
	}
	return s.SendPresenceElement(ctx, r, msg)
}

A session_presence_test.go => session_presence_test.go +133 -0
@@ 0,0 1,133 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package xmpp_test

import (
	"context"
	"encoding/xml"
	"strconv"
	"testing"
	"time"

	"mellium.im/xmlstream"
	"mellium.im/xmpp/internal/xmpptest"
	"mellium.im/xmpp/stanza"
)

var sendPresenceTests = [...]struct {
	msg        stanza.Presence
	canceled   bool
	payload    xml.TokenReader
	err        error
	writesBody bool
	resp       *stanza.Presence
}{
	0: {
		msg:        stanza.Presence{ID: testIQID, Type: stanza.AvailablePresence},
		writesBody: true,
		resp:       &stanza.Presence{ID: testIQID, Type: stanza.ErrorPresence},
	},
	1: {
		msg:        stanza.Presence{ID: testIQID, Type: stanza.ProbePresence},
		writesBody: true,
		resp:       &stanza.Presence{ID: testIQID, Type: stanza.ErrorPresence},
	},
	2: {
		msg:        stanza.Presence{ID: testIQID, Type: stanza.ErrorPresence},
		writesBody: true,
	},
	3: {
		msg:        stanza.Presence{ID: testIQID, Type: stanza.SubscribePresence},
		canceled:   true,
		writesBody: true,
		err:        context.Canceled,
	},
}

func TestSendPresence(t *testing.T) {
	for i, tc := range sendPresenceTests {
		tc := tc
		t.Run(strconv.Itoa(i), func(t *testing.T) {
			t.Run("SendPresenceElement", func(t *testing.T) {
				s := xmpptest.NewClientServer(xmpptest.ServerHandlerFunc(func(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
					if tc.resp != nil {
						_, err := xmlstream.Copy(t, tc.resp.Wrap(nil))
						return err
					}
					return nil
				}))

				ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
				if tc.canceled {
					cancel()
				} else {
					defer cancel()
				}

				resp, err := s.Client.SendPresenceElement(ctx, tc.payload, tc.msg)
				if err != tc.err {
					t.Errorf("unexpected error, want=%q, got=%q", tc.err, err)
				}
				respMsg := stanza.Presence{}
				if resp != nil {
					defer func() {
						if err := resp.Close(); err != nil {
							t.Errorf("Error closing response: %q", err)
						}
					}()
					err = xml.NewTokenDecoder(resp).Decode(&respMsg)
					if err != nil {
						t.Errorf("error decoding response: %v", err)
					}
				}
				switch {
				case resp == nil && tc.resp != nil:
					t.Errorf("expected response, but got none")
				case resp != nil && tc.resp == nil:
					t.Errorf("did not expect response, but got: %+v", respMsg)
				}
			})
			t.Run("SendPresence", func(t *testing.T) {
				s := xmpptest.NewClientServer(xmpptest.ServerHandlerFunc(func(t xmlstream.TokenReadEncoder, start *xml.StartElement) error {
					if tc.resp != nil {
						_, err := xmlstream.Copy(t, tc.resp.Wrap(nil))
						return err
					}
					return nil
				}))

				ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
				if tc.canceled {
					cancel()
				} else {
					defer cancel()
				}

				resp, err := s.Client.SendPresence(ctx, tc.msg.Wrap(tc.payload))
				if err != tc.err {
					t.Errorf("unexpected error, want=%q, got=%q", tc.err, err)
				}
				respMsg := stanza.Presence{}
				if resp != nil {
					defer func() {
						if err := resp.Close(); err != nil {
							t.Errorf("error closing response: %q", err)
						}
					}()
					err = xml.NewTokenDecoder(resp).Decode(&respMsg)
					if err != nil {
						t.Errorf("error decoding response: %v", err)
					}
				}
				switch {
				case resp == nil && tc.resp != nil:
					t.Errorf("expected response, but got none")
				case resp != nil && tc.resp == nil:
					t.Errorf("did not expect response, but got: %+v", respMsg)
				}
			})
		})
	}
}