~samwhited/xmpp

5390b30fcf37cdc218bf532b1b031936e14c4e1a — Sam Whited 4 years ago 00ab9c7
Implement session send/receive API

See #19
3 files changed, 77 insertions(+), 4 deletions(-)

M config.go
A handler.go
M session.go
M config.go => config.go +3 -0
@@ 28,6 28,9 @@ type Config struct {
	// The supported stream features.
	Features map[xml.Name]StreamFeature

	// Handler to invoke on stream reads.
	Handler Handler

	// The default language for any streams constructed using this config.
	Lang language.Tag


A handler.go => handler.go +24 -0
@@ 0,0 1,24 @@
// Copyright 2016 Sam Whited.
// Use of this source code is governed by the BSD 2-clause license that can be
// found in the LICENSE file.

package xmpp

import (
	"encoding/xml"
)

// A Handler triggers events or responds to incoming elements in an XML stream.
type Handler interface {
	HandleXMPP(s *Session, start *xml.StartElement)
}

// The HandlerFunc type is an adapter to allow the use of ordinary functions as
// XMPP handlers. If f is a function with the appropriate signature,
// HandlerFunc(f) is a Handler that calls f.
type HandlerFunc func(s *Session, start *xml.StartElement)

// HandleXMPP calls f(s, start).
func (f HandlerFunc) HandleXMPP(s *Session, start *xml.StartElement) {
	f(s, start)
}

M session.go => session.go +50 -4
@@ 13,6 13,7 @@ import (
	"sync"

	"mellium.im/xmpp/jid"
	"mellium.im/xmpp/streamerror"
)

// SessionState is a bitmask that represents the current state of an XMPP


@@ 47,8 48,8 @@ const (
	InputStreamClosed
)

// A Session represents an XMPP connection that can perform SRV lookups for a
// given server and connect to the correct ports.
// A Session represents an XMPP session comprising an input and an output XML
// stream.
type Session struct {
	config *Config



@@ 74,7 75,9 @@ type Session struct {
	in struct {
		sync.Mutex
		stream
		d *xml.Decoder
		d      *xml.Decoder
		ctx    context.Context
		cancel context.CancelFunc
	}
	out struct {
		sync.Mutex


@@ 104,12 107,55 @@ func NewSession(ctx context.Context, config *Config, rw io.ReadWriter) (*Session
		config: config,
		rw:     rw,
	}
	s.in.ctx, s.in.cancel = context.WithCancel(context.Background())

	if conn, ok := rw.(net.Conn); ok {
		s.conn = conn
	}

	return s, s.negotiateStreams(ctx, rw)
	err := s.negotiateStreams(ctx, rw)
	if err != nil {
		return nil, err
	}
	if config.Handler != nil {
		go s.handleInputStream()
	}
	return s, err
}

func (s *Session) handleInputStream() {
	for {
		select {
		case <-s.in.ctx.Done():
			return
		default:
		}
		// TODO: This needs to be cancelable somehow.
		tok, err := s.Decoder().Token()
		if err != nil {
			select {
			case <-s.in.ctx.Done():
				return
			default:
				// TODO: We need a way to figure out if this was an XML error or an error
				// with the underlying connection.
				s.Encoder().Encode(streamerror.BadFormat)
			}
		}
		switch t := tok.(type) {
		case xml.StartElement:
			s.config.Handler.HandleXMPP(s, &t)
		default:
			select {
			case <-s.in.ctx.Done():
				return
			default:
				// TODO: We need a way to figure out if this was an XML error or an error
				// with the underlying connection.
				s.Encoder().Encode(streamerror.BadFormat)
			}
		}
	}
}

// Conn returns the Session's backing net.Conn or other ReadWriter.