~samwhited/xmpp

432cf0111e2b746d1c705a072f2d09b8c95c5017 — Sam Whited 3 months ago be01406 118_default_serve
xmpp: always start a server goroutine

Previously it was up to the user to start the server goroutine and if
they did not methods that rely on it such as SendIQ would block forever.
This patch changes this by always setting a server with a default
handler and allowing the user to update the handler (and block until the
server goroutine finishes) with a call to `Serve'.

The behavior of multiple `Serve' calls is still undefined.

Signed-off-by: Sam Whited <sam@samwhited.com>
1 files changed, 26 insertions(+), 6 deletions(-)

M session.go
M session.go => session.go +26 -6
@@ 98,6 98,9 @@ type Session struct {
	sentIQMutex sync.Mutex
	sentIQs     map[string]chan xmlstream.TokenReadCloser

	server  chan error
	handler chan Handler

	in struct {
		stream.Info
		d      xml.TokenReader


@@ 190,6 193,8 @@ func negotiateSession(ctx context.Context, location, origin jid.JID, rw io.ReadW
		negotiated: make(map[string]struct{}),
		sentIQs:    make(map[string]chan xmlstream.TokenReadCloser),
		state:      state,
		server:     make(chan error),
		handler:    make(chan Handler),
	}

	if s.state&Received == Received {


@@ 268,6 273,10 @@ func negotiateSession(ctx context.Context, location, origin jid.JID, rw io.ReadW
	}
	s.out.e = se

	go func() {
		s.server <- s.serve(nil)
	}()

	return s, nil
}



@@ 400,12 409,21 @@ func ReceiveServerSession(ctx context.Context, location, origin jid.JID, rw io.R
// so the handler should not close over the session or use any of its send
// methods or a deadlock will occur.
// After Serve finishes running the handler, it flushes the output stream.
func (s *Session) Serve(h Handler) (err error) {
func (s *Session) Serve(h Handler) error {
	select {
	case s.handler <- h:
	default:
	}
	return <-s.server
}

func (s *Session) serve(h Handler) (err error) {
	if h == nil {
		h = nopHandler{}
	}

	defer func() {
		close(s.handler)
		s.closeInputStream()
		e := s.Close()
		if err == nil {


@@ 415,6 433,11 @@ func (s *Session) Serve(h Handler) (err error) {

	for {
		select {
		case newHandler := <-s.handler:
			h = newHandler
			if h == nil {
				h = nopHandler{}
			}
		case <-s.in.ctx.Done():
			return s.in.ctx.Err()
		default:


@@ 974,14 997,11 @@ func isStanzaEmptySpace(name xml.Name) bool {
// from the stream is not an Info/Query (IQ) start element and blocks until a
// response is received.
//
// If the input stream is not being processed (a call to Serve is not running),
// SendIQ will never receive a response and will block until the provided
// context is canceled.
// If the response is non-nil, it does not need to be consumed in its entirety,
// but it must be closed before stream processing will resume.
// If the IQ type does not require a response—ie. it is a result or error IQ,
// meaning that it is a response itself—SendIQElemnt does not block and the
// response is nil.
// meaning that it is a response itself—SendIQ does not block and the response
// is nil.
//
// If the context is closed before the response is received, SendIQ immediately
// returns the context error.