~coder_kalyan/msync

1a790a1af3a63adb647782e8f7d7d284d5b565c6 — Kalyan Sriram 1 year, 7 months ago 2b5d909 master
server: implement smtp queue
7 files changed, 244 insertions(+), 55 deletions(-)

M cmd/msync/main.go
A config/config.go
M go.mod
M go.sum
A relay.go
D server/server.go
M server/smtp.go
M cmd/msync/main.go => cmd/msync/main.go +33 -5
@@ 1,14 1,42 @@
package main

import (
	//"fmt"
	"fmt"
	"flag"
	"os"
	"path"

	"git.sr.ht/~coder_kalyan/msync/server"
	"git.sr.ht/~coder_kalyan/msync"
	"git.sr.ht/~coder_kalyan/msync/config"
)

func main() {
	srv := server.NewServer()
	srv.Start()
	var configPath string
	flag.StringVar(&configPath, "config", "", "path to the configuration file")
	flag.Parse()

	//fmt.Printf("%+v\n", srv)
	if configPath == "" {
		configDir, err := os.UserConfigDir()
		if err != nil {
			panic(err)
		}
		configPath = path.Join(configDir, "msync", "msync.scfg")
	}

	cfg, err := config.LoadConfigFile(configPath)
	if err != nil {
		fmt.Fprintf(os.Stderr, "failed to load the required configuration file at %q: %s\n", configPath, err)
		os.Exit(1)
	}

	relay, err := msync.NewRelay(cfg.Relays[0])
	if err != nil {
		fmt.Fprintf(os.Stderr, "unable to start relay %s: %s\n", cfg.Relays[0].Name, err)
		os.Exit(1)
	}

	errChan := make(chan error)
	relay.Run(errChan)

	<- errChan
}

A config/config.go => config/config.go +103 -0
@@ 0,0 1,103 @@
package config

import (
	"fmt"

	"git.sr.ht/~emersion/go-scfg"
)

// SMTP relays queue and send emails
type Relay struct {
	Name     string
	Address  string
	Username string
	Password string
}

// mailboxes synchronize remote IMAP servers to local mailboxs
type Mailbox struct {
	Name     string
	Address  string
	Username string
	Password string

	Maildirs []*[2]string
}

type Config struct {
	Relays    []*Relay
	Mailboxes []*Mailbox
}

func LoadConfigFile(filename string) (*Config, error) {
	cfg := &Config{}

	directives, err := scfg.Load(filename)
	if err != nil {
		return nil, fmt.Errorf("error parsing configuration: %s", err)
	}

	for _, elem := range directives {
		switch elem.Name {
		case "relay":
			relay := &Relay{}

			if err := elem.ParseParams(&relay.Name); err != nil {
				return nil, err
			}

			for _, d := range elem.Children {
				switch d.Name {
				case "address":
					if err := d.ParseParams(&relay.Address); err != nil {
						return nil, err
					}
				case "username":
					if err := d.ParseParams(&relay.Username); err != nil {
						return nil, err
					}
				case "password":
					if err := d.ParseParams(&relay.Password); err != nil {
						return nil, err
					}
				}
			}

			cfg.Relays = append(cfg.Relays, relay)
		case "mailbox":
			mailbox := &Mailbox{}

			if err := elem.ParseParams(&mailbox.Name); err != nil {
				return nil, err
			}

			for _, d := range elem.Children {
				switch d.Name {
				case "address":
					if err := d.ParseParams(&mailbox.Address); err != nil {
						return nil, err
					}
				case "username":
					if err := d.ParseParams(&mailbox.Username); err != nil {
						return nil, err
					}
				case "password":
					if err := d.ParseParams(&mailbox.Password); err != nil {
						return nil, err
					}
				case "maildir":
					var maildir [2]string
					if err := d.ParseParams(&maildir[0], &maildir[1]); err != nil {
						return nil, err
					}

					mailbox.Maildirs = append(mailbox.Maildirs, &maildir)
				}
			}

			cfg.Mailboxes = append(cfg.Mailboxes, mailbox)
		}
	}

	return cfg, nil
}

M go.mod => go.mod +2 -0
@@ 3,6 3,8 @@ module git.sr.ht/~coder_kalyan/msync
go 1.17

require (
	git.sr.ht/~emersion/go-scfg v0.0.0-20201019143924-142a8aa629fc // indirect
	github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 // indirect
	github.com/emersion/go-smtp v0.15.1-0.20211006082444-62f6b38f85e4 // indirect
	github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
)

M go.sum => go.sum +5 -0
@@ 1,6 1,11 @@
git.sr.ht/~emersion/go-scfg v0.0.0-20201019143924-142a8aa629fc h1:51BD67xFX+bozd3ZRuOUfalrhx4/nQSh6A9lI08rYOk=
git.sr.ht/~emersion/go-scfg v0.0.0-20201019143924-142a8aa629fc/go.mod h1:t+Ww6SR24yYnXzEWiNlOY0AFo5E9B73X++10lrSpp4U=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/emersion/go-smtp v0.15.0 h1:3+hMGMGrqP/lqd7qoxZc1hTU8LY8gHV9RFGWlqSDmP8=
github.com/emersion/go-smtp v0.15.0/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
github.com/emersion/go-smtp v0.15.1-0.20211006082444-62f6b38f85e4 h1:6unG0XYwWUlJjsbYDI06qcRH5Fe0o978bgL8zNydJ8k=
github.com/emersion/go-smtp v0.15.1-0.20211006082444-62f6b38f85e4/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=

A relay.go => relay.go +77 -0
@@ 0,0 1,77 @@
package msync

import (
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/emersion/go-sasl"
	"github.com/emersion/go-smtp"

	"git.sr.ht/~coder_kalyan/msync/config"
	"git.sr.ht/~coder_kalyan/msync/server"
)

type Relay struct {
	cfg *config.Relay

	auth    sasl.Client
	backend *server.Backend
	server  *smtp.Server
}

func NewRelay(cfg *config.Relay) (*Relay, error) {
	relay := &Relay{
		cfg: cfg,
	}

	relay.auth = sasl.NewPlainClient("", relay.cfg.Username, relay.cfg.Password)

	return relay, nil
}

func (relay *Relay) Run(errChan chan error) {
	queue := make(chan server.Mail)
	go relay.RunServer(queue, errChan)
	go relay.RunClient(queue, errChan)
}

func (relay *Relay) RunServer(queue chan server.Mail, errChan chan error) {
	relay.backend = &server.Backend{
		Queue: queue,
	}
	relay.server = smtp.NewServer(relay.backend)

	// TODO configurable
	relay.server.Addr = "localhost:1025"
	relay.server.Domain = "localhost"
	relay.server.ReadTimeout = 10 * time.Second
	relay.server.WriteTimeout = 10 * time.Second
	relay.server.MaxMessageBytes = 1024 * 1024
	relay.server.MaxRecipients = 50
	relay.server.AllowInsecureAuth = true

	log.Printf("[relay %s] Starting server at %s\n", relay.cfg.Name, relay.server.Addr)

	if err := relay.server.ListenAndServe(); err != nil {
		fmt.Println("Error relay %s: %s", relay.cfg.Name, err)
		errChan <- err
		return
	}
}

func (relay *Relay) RunClient(queue chan server.Mail, errChan chan error) {
	for {
		mail := <-queue
		log.Printf("[relay %s] Relaying message from: %s\n", relay.cfg.Name, mail.From)

		// TODO: keep an active connection for multiple messages?
		msg := strings.NewReader(mail.Data)
		err := smtp.SendMail(relay.cfg.Address, relay.auth, mail.From, mail.To, msg)
		if err != nil {
			errChan <- err
			return
		}
	}
}

D server/server.go => server/server.go +0 -42
@@ 1,42 0,0 @@
package server

import (
	"log"
	"time"

	"github.com/emersion/go-smtp"
)

type Server struct {
	smtp struct {
		backend *Backend
		server *smtp.Server
	}
}

func NewServer() (srv *Server) {
	srv = &Server{}

	srv.smtp.backend = &Backend{}
	srv.smtp.server = smtp.NewServer(srv.smtp.backend)

	// TODO configurable
	srv.smtp.server.Addr = ":1025"
	srv.smtp.server.Domain = "localhost"
	srv.smtp.server.ReadTimeout = 10 * time.Second
	srv.smtp.server.WriteTimeout = 10 * time.Second
	srv.smtp.server.MaxMessageBytes = 1024 * 1024
	srv.smtp.server.MaxRecipients = 50
	srv.smtp.server.AllowInsecureAuth = true

	return
}

func (srv *Server) Start() error {
	log.Println("Starting server at ", srv.smtp.server.Addr)
	if err := srv.smtp.server.ListenAndServe(); err != nil {
		return err
	}

	return nil
}

M server/smtp.go => server/smtp.go +24 -8
@@ 9,13 9,24 @@ import (
	"github.com/emersion/go-smtp"
)

type Backend struct{}
type Mail struct {
	From string
	To   []string
	Data string
}

type Backend struct {
	Queue chan Mail
}

func (backend *Backend) NewSession(_ smtp.ConnectionState, _ string) (smtp.Session, error) {
	return &Session{}, nil
	return &Session{queue: backend.Queue}, nil
}

type Session struct{}
type Session struct {
	queue chan Mail
	mail  Mail
}

func (s *Session) AuthPlain(username string, password string) error {
	if username != "username" || password != "password" {


@@ 28,13 39,13 @@ func (s *Session) AuthPlain(username string, password string) error {
}

func (s *Session) Mail(from string, opts *smtp.MailOptions) error {
	log.Println("Mail from:", from)
	s.mail.From = from

	return nil
}

func (s *Session) Rcpt(to string) error {
	log.Println("Rcpt to:", to)
	s.mail.To = append(s.mail.To, to)

	return nil
}


@@ 43,17 54,22 @@ func (s *Session) Data(r io.Reader) error {
	if b, err := ioutil.ReadAll(r); err != nil {
		return err
	} else {
		log.Println("Data:", string(b))
		s.mail.Data = string(b)
	}

	if s.mail.From != "" && s.mail.To != nil {
		s.queue <- s.mail
	} // TODO handle else

	return nil
}

func (s *Session) Reset() {
	log.Println("Reset")
	s.mail.From = ""
	s.mail.To = nil
	s.mail.Data = ""
}

func (s *Session) Logout() error {
	return nil
}