~samwhited/xmpp

ref: 1a8e804e7af154af29f9b94ecac767a2fd7327e3 xmpp/pubsub/fetch.go -rw-r--r-- 3.9 KiB
1a8e804eSam Whited pubsub: make internal package public 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2021 The Mellium Contributors.
// Use of this source code is governed by the BSD 2-clause
// license that can be found in the LICENSE file.

package pubsub

import (
	"context"
	"encoding/xml"
	"strconv"

	"mellium.im/xmlstream"
	"mellium.im/xmpp"
	"mellium.im/xmpp/paging"
	"mellium.im/xmpp/stanza"
)

// Query represents the options for fetching and iterating over pubsub items.
type Query struct {
	// Node is the ID of a node to query.
	Node string

	// Item is a specific item to fetch by its ID.
	// Most users should use one of the methods specifically for fetching
	// individual items instead of filtering the results and using an iterator
	// over 0 or 1 items.
	Item string

	// MaxItems can be used to restrict results to the most recent items.
	MaxItems uint64
}

// Fetch requests all items in a node and returns an iterator over each item.
//
// Processing the session will become blocked until the iterator is closed.
// Any errors encountered while creating the iter are deferred until the iter is
// used.
func Fetch(ctx context.Context, s *xmpp.Session, q Query) *Iter {
	return FetchIQ(ctx, stanza.IQ{}, s, q)
}

// FetchIQ is like Fetch but it allows you to customize the IQ.
// Changing the type of the provided IQ has no effect.
func FetchIQ(ctx context.Context, iq stanza.IQ, s *xmpp.Session, q Query) *Iter {
	iq.Type = stanza.GetIQ
	queryAttrs := []xml.Attr{{
		Name:  xml.Name{Local: "node"},
		Value: q.Node,
	}}
	if q.MaxItems > 0 {
		queryAttrs = append(queryAttrs, xml.Attr{
			Name:  xml.Name{Local: "max_items"},
			Value: strconv.FormatUint(q.MaxItems, 10),
		})
	}
	if q.Item != "" {
		queryAttrs = append(queryAttrs, xml.Attr{
			Name:  xml.Name{Local: "item"},
			Value: q.Item,
		})
	}
	// We can't use IterIQElement because the IQ payload does not contain the
	// items directly, instead there is another wrapper element.
	resp, err := s.SendIQElement(ctx, xmlstream.Wrap(
		xmlstream.Wrap(
			nil,
			xml.StartElement{Name: xml.Name{Local: "items"}, Attr: queryAttrs},
		),
		xml.StartElement{Name: xml.Name{Space: NS, Local: "pubsub"}},
	), iq)
	if err != nil {
		return &Iter{err: err}
	}

	tok, err := resp.Token()
	if err != nil {
		/* #nosec */
		resp.Close()
		return &Iter{err: err}
	}
	start, ok := tok.(xml.StartElement)
	if ok {
		_, err := stanza.UnmarshalIQError(resp, start)
		if err != nil {
			/* #nosec */
			resp.Close()
			return &Iter{err: err}
		}
	}

	// Pop pubsub, and items tokens.
	for i := 0; i < 2; i++ {
		_, err = resp.Token()
		if err != nil {
			/* #nosec */
			resp.Close()
			return &Iter{err: err}
		}
	}

	return &Iter{
		iter: paging.WrapIter(xmlstream.NewIter(resp), 0),
		err:  err,
	}
}

// Iter is an iterator over payload items.
type Iter struct {
	iter    *paging.Iter
	current xml.TokenReader
	currID  string
	err     error
}

// Next returns true if there are more items to decode.
func (i *Iter) Next() bool {
	if i.err != nil || !i.iter.Next() {
		return false
	}
	start, r := i.iter.Current()
	// If we encounter a lone token that doesn't begin with a start element (eg.
	// a comment) skip it. This should never happen with XMPP, but we don't want
	// to panic in case this somehow happens so just skip it.
	if start == nil {
		return i.Next()
	}
	i.currID = ""
	i.current = xmlstream.Inner(r)
	for _, attr := range start.Attr {
		if attr.Name.Local == "id" {
			i.currID = attr.Value
			break
		}
	}
	return true
}

// Err returns the last error encountered by the iterator (if any).
func (i *Iter) Err() error {
	if i.err != nil {
		return i.err
	}

	return i.iter.Err()
}

// Item returns the last item parsed by the iterator.
// If no payloads were requested in the original query the reader may be nil.
func (i *Iter) Item() (id string, r xml.TokenReader) {
	return i.currID, i.current
}

// Close indicates that we are finished with the given iterator and processing
// the stream may continue.
// Calling it multiple times has no effect.
func (i *Iter) Close() error {
	if i.iter == nil {
		return nil
	}
	return i.iter.Close()
}