~rbn/neinp

57a398f8647890af75cde108845ee80ed6619e8c — Sören Tempel 4 months ago f6dcb66 master v0.0.5
Allow multiple outstanding responses

Previously, the send() goroutine would block until an executed response
handler wrote a response to the response channel. If a response handler
does not write a response to the channel immediately, e.g. because it
blocks on some sort of event, neinp would not process further messages
until this response handler unblocks. This is due to the fact that the
send goroutine would be blocked on the response channel.

This commit allows for multiple outstanding responses by executing a
separate send goroutine for each response. Additionally a mutex is
utilized to prevent concurrent writes to the socket. Response handler
error handling has also been slightly adjusted accordingly.
1 files changed, 26 insertions(+), 16 deletions(-)

M server.go
M server.go => server.go +26 -16
@@ 222,25 222,30 @@ func (s *Server) handle(done <-chan struct{}, in <-chan message.Message) (<-chan
// send sends the replys to the client.
func (s *Server) send(done <-chan struct{}, out <-chan (<-chan message.Message), w io.Writer) <-chan error {
	senderr := make(chan error)
	sendmtx := new(sync.Mutex)

	go func() {
		defer close(senderr)
		for {
			select {
			case <-done:
				return
			case x := <-out:
				for msg := range x {
					if s.Debug {
						log.Printf("-> %#v", msg.Content)
				go func(ch <-chan message.Message) {
					for msg := range ch {
						sendmtx.Lock()
						defer sendmtx.Unlock()

						if s.Debug {
							log.Printf("-> %#v", msg.Content)
						}

						_, err := msg.Encode(w)
						if err != nil {
							senderr <- err
							return
						}
					}

					_, err := msg.Encode(w)
					if err != nil {
						senderr <- err
						return
					}
				}
				}(x)
			}
		}
	}()


@@ 258,6 263,7 @@ func (s *Server) Serve(rw io.ReadWriter) error {

	defer s.cleanup()

	procErr := make(chan error)
	for {
		select {
		case err := <-rcvErr:


@@ 265,11 271,15 @@ func (s *Server) Serve(rw io.ReadWriter) error {
			return err
		case processErr := <-handleErr:
			// the channel may be closed or error nil
			err := <-processErr
			if err != nil {
				close(done)
				return err
			}
			go func() {
				err := <-processErr
				if err != nil {
					procErr <- err
				}
			}()
		case err := <-procErr:
			close(done)
			return err
		case err := <-sendErr:
			close(done)
			return err