aerc/worker/imap/worker.go

463 lines
11 KiB
Go
Raw Normal View History

package imap
import (
2018-01-31 21:54:52 -05:00
"crypto/tls"
2018-01-14 11:30:11 +01:00
"fmt"
"net"
2018-01-14 11:30:11 +01:00
"net/url"
"strconv"
2018-01-14 11:30:11 +01:00
"strings"
"time"
2018-01-10 17:19:45 +01:00
2018-01-14 11:30:11 +01:00
"github.com/emersion/go-imap"
2020-09-12 15:05:02 +02:00
sortthread "github.com/emersion/go-imap-sortthread"
2018-01-31 21:18:21 -05:00
"github.com/emersion/go-imap/client"
"golang.org/x/oauth2"
2018-01-31 21:18:21 -05:00
"git.sr.ht/~rjarry/aerc/lib"
"git.sr.ht/~rjarry/aerc/models"
"git.sr.ht/~rjarry/aerc/worker/handlers"
"git.sr.ht/~rjarry/aerc/worker/types"
)
func init() {
handlers.RegisterWorkerFactory("imap", NewIMAPWorker)
handlers.RegisterWorkerFactory("imaps", NewIMAPWorker)
}
2018-01-14 11:30:11 +01:00
var errUnsupported = fmt.Errorf("unsupported command")
type imapClient struct {
*client.Client
thread *sortthread.ThreadClient
sort *sortthread.SortClient
2018-01-14 11:30:11 +01:00
}
type IMAPWorker struct {
2018-01-14 11:30:11 +01:00
config struct {
scheme string
insecure bool
addr string
user *url.Userinfo
folders []string
oauthBearer lib.OAuthBearer
// tcp connection parameters
connection_timeout time.Duration
keepalive_period time.Duration
keepalive_probes int
keepalive_interval int
2018-01-14 11:30:11 +01:00
}
client *imapClient
2019-05-13 20:16:55 -04:00
idleStop chan struct{}
idleDone chan error
selected *imap.MailboxStatus
updates chan client.Update
worker *types.Worker
2019-03-20 23:23:38 -04:00
// Map of sequence numbers to UIDs, index 0 is seq number 1
seqMap []uint32
done chan struct{}
autoReconnect bool
}
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
return &IMAPWorker{
2019-05-13 20:16:55 -04:00
idleDone: make(chan error),
updates: make(chan client.Update, 50),
worker: worker,
selected: &imap.MailboxStatus{},
}, nil
}
2018-01-14 11:30:11 +01:00
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
if w.client != nil && w.client.State() == imap.SelectedState {
close(w.idleStop)
2019-05-13 20:16:55 -04:00
if err := <-w.idleDone; err != nil {
w.worker.PostMessage(&types.Error{Error: err}, nil)
}
}
defer func() {
if w.client != nil && w.client.State() == imap.SelectedState {
w.idleStop = make(chan struct{})
go func() {
w.idleDone <- w.client.Idle(w.idleStop, &client.IdleOptions{0, 0})
}()
}
}()
2019-05-13 20:16:55 -04:00
var reterr error // will be returned at the end, needed to support idle
2018-01-10 17:19:45 +01:00
switch msg := msg.(type) {
2018-02-01 19:54:19 -05:00
case *types.Unsupported:
2018-01-14 11:30:11 +01:00
// No-op
2018-02-01 19:54:19 -05:00
case *types.Configure:
2018-01-14 11:30:11 +01:00
u, err := url.Parse(msg.Config.Source)
if err != nil {
reterr = err
break
}
2018-01-14 11:30:11 +01:00
w.config.scheme = u.Scheme
if strings.HasSuffix(w.config.scheme, "+insecure") {
w.config.scheme = strings.TrimSuffix(w.config.scheme, "+insecure")
w.config.insecure = true
}
if strings.HasSuffix(w.config.scheme, "+oauthbearer") {
w.config.scheme = strings.TrimSuffix(w.config.scheme, "+oauthbearer")
w.config.oauthBearer.Enabled = true
q := u.Query()
oauth2 := &oauth2.Config{}
if q.Get("token_endpoint") != "" {
oauth2.ClientID = q.Get("client_id")
oauth2.ClientSecret = q.Get("client_secret")
oauth2.Scopes = []string{q.Get("scope")}
oauth2.Endpoint.TokenURL = q.Get("token_endpoint")
}
w.config.oauthBearer.OAuth2 = oauth2
}
2018-01-14 11:30:11 +01:00
w.config.addr = u.Host
if !strings.ContainsRune(w.config.addr, ':') {
2019-05-20 19:20:20 -04:00
w.config.addr += ":" + w.config.scheme
}
2018-01-14 11:30:11 +01:00
w.config.user = u.User
2019-06-12 08:31:51 +02:00
w.config.folders = msg.Config.Folders
w.config.connection_timeout = 30 * time.Second
w.config.keepalive_period = 0 * time.Second
w.config.keepalive_probes = 3
w.config.keepalive_interval = 3
for key, value := range msg.Config.Params {
switch key {
case "connection-timeout":
val, err := time.ParseDuration(value)
if err != nil || val < 0 {
reterr = fmt.Errorf(
"invalid connection-timeout value %v: %v",
value, err)
break
}
w.config.connection_timeout = val
case "keepalive-period":
val, err := time.ParseDuration(value)
if err != nil || val < 0 {
reterr = fmt.Errorf(
"invalid keepalive-period value %v: %v",
value, err)
break
}
w.config.keepalive_period = val
case "keepalive-probes":
val, err := strconv.Atoi(value)
if err != nil || val < 0 {
reterr = fmt.Errorf(
"invalid keepalive-probes value %v: %v",
value, err)
break
}
w.config.keepalive_probes = val
case "keepalive-interval":
val, err := time.ParseDuration(value)
if err != nil || val < 0 {
reterr = fmt.Errorf(
"invalid keepalive-interval value %v: %v",
value, err)
break
}
w.config.keepalive_interval = int(val.Seconds())
}
}
2018-02-01 19:54:19 -05:00
case *types.Connect:
if w.client != nil && w.client.State() == imap.SelectedState {
reterr = fmt.Errorf("Already connected")
break
}
c, err := w.connect()
if err != nil {
reterr = err
break
2018-01-14 11:30:11 +01:00
}
w.stopConnectionObserver()
2018-01-14 11:30:11 +01:00
c.Updates = w.updates
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
w.startConnectionObserver()
w.autoReconnect = true
w.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
case *types.Reconnect:
if !w.autoReconnect {
reterr = fmt.Errorf("auto-reconnect is disabled; run connect to enable it")
break
}
c, err := w.connect()
if err != nil {
reterr = err
break
}
w.stopConnectionObserver()
c.Updates = w.updates
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
w.startConnectionObserver()
2018-02-01 19:54:19 -05:00
w.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
case *types.Disconnect:
if w.client == nil || w.client.State() != imap.SelectedState {
reterr = fmt.Errorf("Not connected")
break
}
w.stopConnectionObserver()
if err := w.client.Logout(); err != nil {
reterr = err
break
}
w.autoReconnect = false
w.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
2018-02-01 19:54:19 -05:00
case *types.ListDirectories:
2018-02-01 19:34:08 -05:00
w.handleListDirectories(msg)
2019-01-13 16:18:10 -05:00
case *types.OpenDirectory:
w.handleOpenDirectory(msg)
case *types.FetchDirectoryContents:
w.handleFetchDirectoryContents(msg)
case *types.FetchDirectoryThreaded:
w.handleDirectoryThreaded(msg)
case *types.CreateDirectory:
w.handleCreateDirectory(msg)
case *types.RemoveDirectory:
w.handleRemoveDirectory(msg)
case *types.FetchMessageHeaders:
w.handleFetchMessageHeaders(msg)
2019-03-31 12:14:37 -04:00
case *types.FetchMessageBodyPart:
w.handleFetchMessageBodyPart(msg)
case *types.FetchFullMessages:
w.handleFetchFullMessages(msg)
2019-03-20 23:23:38 -04:00
case *types.DeleteMessages:
w.handleDeleteMessages(msg)
case *types.FlagMessages:
w.handleFlagMessages(msg)
2020-05-25 16:59:48 +02:00
case *types.AnsweredMessages:
w.handleAnsweredMessages(msg)
2019-05-14 16:34:42 -04:00
case *types.CopyMessages:
w.handleCopyMessages(msg)
case *types.AppendMessage:
w.handleAppendMessage(msg)
case *types.SearchDirectory:
w.handleSearchDirectory(msg)
2018-01-14 11:30:11 +01:00
default:
reterr = errUnsupported
}
2019-05-13 20:16:55 -04:00
return reterr
}
2019-01-13 16:18:10 -05:00
func (w *IMAPWorker) handleImapUpdate(update client.Update) {
w.worker.Logger.Printf("(= %T", update)
switch update := update.(type) {
case *client.MailboxUpdate:
status := update.Mailbox
if w.selected.Name == status.Name {
w.selected = status
}
2019-01-13 16:18:10 -05:00
w.worker.PostMessage(&types.DirectoryInfo{
Info: &models.DirectoryInfo{
Flags: status.Flags,
Name: status.Name,
ReadOnly: status.ReadOnly,
Exists: int(status.Messages),
Recent: int(status.Recent),
Unseen: int(status.Unseen),
},
2019-01-13 16:18:10 -05:00
}, nil)
case *client.MessageUpdate:
msg := update.Message
if msg.Uid == 0 {
msg.Uid = w.seqMap[msg.SeqNum-1]
}
w.worker.PostMessage(&types.MessageInfo{
Info: &models.MessageInfo{
BodyStructure: translateBodyStructure(msg.BodyStructure),
Envelope: translateEnvelope(msg.Envelope),
Flags: translateImapFlags(msg.Flags),
InternalDate: msg.InternalDate,
Uid: msg.Uid,
},
}, nil)
2019-05-13 20:23:23 -04:00
case *client.ExpungeUpdate:
i := update.SeqNum - 1
uid := w.seqMap[i]
w.seqMap = append(w.seqMap[:i], w.seqMap[i+1:]...)
w.worker.PostMessage(&types.MessagesDeleted{
Uids: []uint32{uid},
}, nil)
2019-01-13 16:18:10 -05:00
}
}
func (w *IMAPWorker) startConnectionObserver() {
go func() {
select {
case <-w.client.LoggedOut():
w.worker.PostMessage(&types.ConnError{
Error: fmt.Errorf("Logged Out"),
}, nil)
case <-w.done:
return
}
}()
}
func (w *IMAPWorker) stopConnectionObserver() {
if w.done != nil {
close(w.done)
}
w.done = make(chan struct{})
}
func (w *IMAPWorker) connect() (*client.Client, error) {
var (
conn *net.TCPConn
c *client.Client
)
addr, err := net.ResolveTCPAddr("tcp", w.config.addr)
if err != nil {
return nil, err
}
conn, err = net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
if w.config.connection_timeout > 0 {
end := time.Now().Add(w.config.connection_timeout)
err = conn.SetDeadline(end)
if err != nil {
return nil, err
}
}
if w.config.keepalive_period > 0 {
err = w.setKeepaliveParameters(conn)
if err != nil {
return nil, err
}
}
serverName, _, _ := net.SplitHostPort(w.config.addr)
tlsConfig := &tls.Config{ServerName: serverName}
switch w.config.scheme {
case "imap":
c, err = client.New(conn)
if err != nil {
return nil, err
}
if !w.config.insecure {
if err = c.StartTLS(tlsConfig); err != nil {
return nil, err
}
}
case "imaps":
tlsConn := tls.Client(conn, tlsConfig)
c, err = client.New(tlsConn)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("Unknown IMAP scheme %s", w.config.scheme)
}
c.ErrorLog = w.worker.Logger
if w.config.user != nil {
username := w.config.user.Username()
password, hasPassword := w.config.user.Password()
if !hasPassword {
// TODO: ask password
}
if w.config.oauthBearer.Enabled {
if err := w.config.oauthBearer.Authenticate(
username, password, c); err != nil {
return nil, err
}
} else if err := c.Login(username, password); err != nil {
return nil, err
}
}
c.SetDebug(w.worker.Logger.Writer())
if _, err := c.Select(imap.InboxName, false); err != nil {
return nil, err
}
return c, nil
}
// Set additional keepalive parameters.
// Uses new interfaces introduced in Go1.11, which let us get connection's file
// descriptor, without blocking, and therefore without uncontrolled spawning of
// threads (not goroutines, actual threads).
func (w *IMAPWorker) setKeepaliveParameters(conn *net.TCPConn) error {
err := conn.SetKeepAlive(true)
if err != nil {
return err
}
// Idle time before sending a keepalive probe
err = conn.SetKeepAlivePeriod(w.config.keepalive_period)
if err != nil {
return err
}
rawConn, e := conn.SyscallConn()
if e != nil {
return e
}
err = rawConn.Control(func(fdPtr uintptr) {
fd := int(fdPtr)
// Max number of probes before failure
err := lib.SetTcpKeepaliveProbes(fd, w.config.keepalive_probes)
if err != nil {
w.worker.Logger.Printf(
"cannot set tcp keepalive probes: %v\n", err)
}
// Wait time after an unsuccessful probe
err = lib.SetTcpKeepaliveInterval(fd, w.config.keepalive_interval)
if err != nil {
w.worker.Logger.Printf(
"cannot set tcp keepalive interval: %v\n", err)
}
})
return err
}
func (w *IMAPWorker) Run() {
for {
select {
2018-02-01 18:42:03 -05:00
case msg := <-w.worker.Actions:
msg = w.worker.ProcessAction(msg)
2018-01-14 11:30:11 +01:00
if err := w.handleMessage(msg); err == errUnsupported {
2018-02-01 19:54:19 -05:00
w.worker.PostMessage(&types.Unsupported{
2018-01-14 11:30:11 +01:00
Message: types.RespondTo(msg),
2018-02-01 18:42:03 -05:00
}, nil)
2018-01-14 11:30:11 +01:00
} else if err != nil {
2018-02-01 19:54:19 -05:00
w.worker.PostMessage(&types.Error{
2018-01-14 11:30:11 +01:00
Message: types.RespondTo(msg),
Error: err,
2018-02-01 18:42:03 -05:00
}, nil)
2018-01-14 11:30:11 +01:00
}
case update := <-w.updates:
2019-01-13 16:18:10 -05:00
w.handleImapUpdate(update)
}
}
}