~samwhited/xmpp

b45fd77991b178388e156fc0505c5b3bbecfe162 — Sam Whited 3 months ago 70cfc50
internal/pubsub: add retraction and fix publishing

Continue the efforts do actually do pubsub right by fixing how we
iterate when fetching and adding retraction.

Signed-off-by: Sam Whited <sam@samwhited.com>
3 files changed, 124 insertions(+), 22 deletions(-)

M internal/pubsub/fetch.go
M internal/pubsub/integration_test.go
A internal/pubsub/retract.go
M internal/pubsub/fetch.go => internal/pubsub/fetch.go +35 -3
@@ 59,15 59,47 @@ func FetchIQ(ctx context.Context, iq stanza.IQ, s *xmpp.Session, q Query) *Iter 
			Value: q.Item,
		})
	}
	iter, _, err := s.IterIQElement(ctx, xmlstream.Wrap(
	// We can't use IterIQElement because the IQ payload does not contain the
	// items directly, instead there is another wrapper element.
	resp, err := s.SendIQElement(ctx, xmlstream.Wrap(
		xmlstream.Wrap(
			nil,
			xml.StartElement{Name: xml.Name{Local: "items"}, Attr: queryAttrs},
		),
		xml.StartElement{Name: xml.Name{Space: NS, Local: "pubsub"}},
	), iq)
	if err != nil {
		return &Iter{err: err}
	}

	tok, err := resp.Token()
	if err != nil {
		/* #nosec */
		resp.Close()
		return &Iter{err: err}
	}
	start, ok := tok.(xml.StartElement)
	if ok {
		_, err := stanza.UnmarshalIQError(resp, start)
		if err != nil {
			/* #nosec */
			resp.Close()
			return &Iter{err: err}
		}
	}

	// Pop pubsub, and items tokens.
	for i := 0; i < 2; i++ {
		_, err = resp.Token()
		if err != nil {
			/* #nosec */
			resp.Close()
			return &Iter{err: err}
		}
	}

	return &Iter{
		iter: paging.WrapIter(iter, 0),
		iter: paging.WrapIter(xmlstream.NewIter(resp), 0),
		err:  err,
	}
}


@@ 93,7 125,7 @@ func (i *Iter) Next() bool {
		return i.Next()
	}
	i.currID = ""
	i.current = r
	i.current = xmlstream.Inner(r)
	for _, attr := range start.Attr {
		if attr.Name.Local == "id" {
			i.currID = attr.Value

M internal/pubsub/integration_test.go => internal/pubsub/integration_test.go +45 -19
@@ 21,7 21,6 @@ import (
	"mellium.im/xmpp/internal/integration/ejabberd"
	"mellium.im/xmpp/internal/integration/prosody"
	"mellium.im/xmpp/internal/pubsub"
	"mellium.im/xmpp/stanza"
)

func TestIntegrationPubFetch(t *testing.T) {


@@ 73,26 72,53 @@ func integrationPubFetch(ctx context.Context, t *testing.T, cmd *integration.Cmd
			t.Errorf("wrong ID for published value: want=%q, got=%q", strID, newID)
		}
	}
	iter := pubsub.FetchIQ(ctx, stanza.IQ{ID: "0000"}, session, pubsub.Query{})
	type Foo struct {
	iter := pubsub.Fetch(ctx, session, pubsub.Query{
		Node: t.Name(),
	})

	const strID = "9"
	hasNext := iter.Next()
	if !hasNext {
		t.Fatalf("no item found")
	}
	id, r := iter.Item()
	if id != strID {
		t.Errorf("wrong ID for fetched value: want=%q, got=%q", strID, id)
	}
	foo := struct {
		XMLName xml.Name `xml:"foo"`
		ID      int      `xml:"id,attr"`
	}{}
	err = xml.NewTokenDecoder(r).Decode(&foo)
	if err != nil {
		t.Fatalf("error decoding %s foo: %v", id, err)
	}
	i := 0
	for iter.Next() {
		strID := strconv.Itoa(i)
		id, r := iter.Item()
		if id != strID {
			t.Errorf("wrong ID for fetched value: want=%q, got=%q", strID, id)
		}
		foo := Foo{}
		err := xml.NewTokenDecoder(r).Decode(&foo)
		if err != nil {
			t.Fatalf("error decoding %s foo: %v", id, err)
		}
		if foo.ID != i {
			t.Errorf("wrong value for ID in element: want=%s, got=%d", strID, foo.ID)
		}
		i++
	if foo.ID != 9 {
		t.Errorf("wrong value for ID in element: want=%s, got=%d", strID, foo.ID)
	}
	hasNext = iter.Next()
	if hasNext {
		t.Fatalf("expected default pep to only store one item")
	}

	err = iter.Close()
	if err != nil {
		t.Fatalf("error closing iter: %v", err)
	}

	err = pubsub.Delete(ctx, session, t.Name(), "9", false)
	if err != nil {
		t.Fatalf("error retracting pubsub item: %v", err)
	}

	iter = pubsub.Fetch(ctx, session, pubsub.Query{
		Node: t.Name(),
	})
	if iter.Next() {
		t.Fatalf("expected node to be empty after deletion")
	}
	err = iter.Close()
	if err != nil {
		t.Fatalf("error closing second iter: %v", err)
	}
}

A internal/pubsub/retract.go => internal/pubsub/retract.go +44 -0
@@ 0,0 1,44 @@
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package pubsub

import (
	"context"
	"encoding/xml"

	"mellium.im/xmlstream"
	"mellium.im/xmpp"
	"mellium.im/xmpp/stanza"
)

// Delete removes an item from the pubsub node.
func Delete(ctx context.Context, s *xmpp.Session, node, id string, notify bool) error {
	return DeleteIQ(ctx, s, stanza.IQ{}, node, id, notify)
}

// DeleteIQ is like Publish except that it allows modifying the IQ.
// Changes to the IQ type will have no effect.
func DeleteIQ(ctx context.Context, s *xmpp.Session, iq stanza.IQ, node, id string, notify bool) error {
	iq.Type = stanza.SetIQ
	retractAttrs := []xml.Attr{{Name: xml.Name{Local: "node"}, Value: node}}
	if notify {
		retractAttrs = append(retractAttrs, xml.Attr{
			Name:  xml.Name{Local: "notify"},
			Value: "true",
		})
	}
	return s.UnmarshalIQElement(ctx, xmlstream.Wrap(
		xmlstream.Wrap(
			xmlstream.Wrap(
				nil,
				xml.StartElement{Name: xml.Name{Local: "item"}, Attr: []xml.Attr{{Name: xml.Name{Local: "id"}, Value: id}}},
			),
			xml.StartElement{Name: xml.Name{Local: "retract"}, Attr: retractAttrs},
		),
		xml.StartElement{Name: xml.Name{Space: NS, Local: "pubsub"}},
	), stanza.IQ{
		Type: stanza.SetIQ,
	}, nil)
}