imap: manage idle mode with an idler
Untangle the idle functionality from the message handling routine. Wait for the idle mode to properly exit every time to ensure a consistent imap state. Timeout when hanging in idle mode and inform the ui. Signed-off-by: Koni Marti <koni.marti@gmail.com> Acked-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
parent
4d75137b20
commit
397a6f267f
3 changed files with 182 additions and 25 deletions
|
@ -95,5 +95,7 @@ func (w *IMAPWorker) handleConfigure(msg *types.Configure) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.idler = newIdler(w.config, w.worker)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
149
worker/imap/idler.go
Normal file
149
worker/imap/idler.go
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
package imap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.sr.ht/~rjarry/aerc/logging"
|
||||||
|
"git.sr.ht/~rjarry/aerc/worker/types"
|
||||||
|
"github.com/emersion/go-imap"
|
||||||
|
"github.com/emersion/go-imap/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errIdleTimeout = fmt.Errorf("idle timeout")
|
||||||
|
errIdleModeHangs = fmt.Errorf("idle mode hangs; waiting to reconnect")
|
||||||
|
)
|
||||||
|
|
||||||
|
// idler manages the idle mode of the imap server. Enter idle mode if there's
|
||||||
|
// no other task and leave idle mode when a new task arrives. Idle mode is only
|
||||||
|
// used when the client is ready and connected. After a connection loss, make
|
||||||
|
// sure that idling returns gracefully and the worker remains responsive.
|
||||||
|
type idler struct {
|
||||||
|
sync.Mutex
|
||||||
|
config imapConfig
|
||||||
|
client *imapClient
|
||||||
|
worker *types.Worker
|
||||||
|
stop chan struct{}
|
||||||
|
done chan error
|
||||||
|
waiting bool
|
||||||
|
idleing bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIdler(cfg imapConfig, w *types.Worker) *idler {
|
||||||
|
return &idler{config: cfg, worker: w, done: make(chan error)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) SetClient(c *imapClient) {
|
||||||
|
i.Lock()
|
||||||
|
i.client = c
|
||||||
|
i.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) setWaiting(wait bool) {
|
||||||
|
i.Lock()
|
||||||
|
i.waiting = wait
|
||||||
|
i.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) isWaiting() bool {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
return i.waiting
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) isReady() bool {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
return (!i.waiting && i.client != nil &&
|
||||||
|
i.client.State() == imap.SelectedState)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) Start() {
|
||||||
|
if i.isReady() {
|
||||||
|
i.stop = make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer logging.PanicHandler()
|
||||||
|
i.idleing = true
|
||||||
|
i.log("=>(idle)")
|
||||||
|
now := time.Now()
|
||||||
|
err := i.client.Idle(i.stop,
|
||||||
|
&client.IdleOptions{
|
||||||
|
LogoutTimeout: 0,
|
||||||
|
PollInterval: 0,
|
||||||
|
})
|
||||||
|
i.idleing = false
|
||||||
|
i.done <- err
|
||||||
|
i.log("elapsed ideling time:", time.Since(now))
|
||||||
|
}()
|
||||||
|
} else if i.isWaiting() {
|
||||||
|
i.log("not started: wait for idle to exit")
|
||||||
|
} else {
|
||||||
|
i.log("not started: client not ready")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) Stop() error {
|
||||||
|
var reterr error
|
||||||
|
if i.isReady() {
|
||||||
|
close(i.stop)
|
||||||
|
select {
|
||||||
|
case err := <-i.done:
|
||||||
|
if err == nil {
|
||||||
|
i.log("<=(idle)")
|
||||||
|
} else {
|
||||||
|
i.log("<=(idle) with err:", err)
|
||||||
|
}
|
||||||
|
reterr = nil
|
||||||
|
case <-time.After(i.config.idle_timeout):
|
||||||
|
i.log("idle err (timeout); waiting in background")
|
||||||
|
|
||||||
|
i.log("disconnect done->")
|
||||||
|
i.worker.PostMessage(&types.Done{
|
||||||
|
Message: types.RespondTo(&types.Disconnect{}),
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
i.waitOnIdle()
|
||||||
|
|
||||||
|
reterr = errIdleTimeout
|
||||||
|
}
|
||||||
|
} else if i.isWaiting() {
|
||||||
|
i.log("not stopped: still idleing/hanging")
|
||||||
|
reterr = errIdleModeHangs
|
||||||
|
} else {
|
||||||
|
i.log("not stopped: client not ready")
|
||||||
|
reterr = nil
|
||||||
|
}
|
||||||
|
return reterr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) waitOnIdle() {
|
||||||
|
i.setWaiting(true)
|
||||||
|
i.log("wait for idle in background")
|
||||||
|
go func() {
|
||||||
|
defer logging.PanicHandler()
|
||||||
|
select {
|
||||||
|
case err := <-i.done:
|
||||||
|
if err == nil {
|
||||||
|
i.log("<=(idle) waited")
|
||||||
|
i.log("connect done->")
|
||||||
|
i.worker.PostMessage(&types.Done{
|
||||||
|
Message: types.RespondTo(&types.Connect{}),
|
||||||
|
}, nil)
|
||||||
|
} else {
|
||||||
|
i.log("<=(idle) waited; with err:", err)
|
||||||
|
}
|
||||||
|
i.setWaiting(false)
|
||||||
|
i.stop = make(chan struct{})
|
||||||
|
i.log("restart")
|
||||||
|
i.Start()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *idler) log(args ...interface{}) {
|
||||||
|
header := fmt.Sprintf("idler (%p) [idle:%t,wait:%t]", i, i.idleing, i.waiting)
|
||||||
|
i.worker.Logger.Println(append([]interface{}{header}, args...)...)
|
||||||
|
}
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"git.sr.ht/~rjarry/aerc/lib"
|
"git.sr.ht/~rjarry/aerc/lib"
|
||||||
"git.sr.ht/~rjarry/aerc/logging"
|
|
||||||
"git.sr.ht/~rjarry/aerc/models"
|
"git.sr.ht/~rjarry/aerc/models"
|
||||||
"git.sr.ht/~rjarry/aerc/worker/handlers"
|
"git.sr.ht/~rjarry/aerc/worker/handlers"
|
||||||
"git.sr.ht/~rjarry/aerc/worker/types"
|
"git.sr.ht/~rjarry/aerc/worker/types"
|
||||||
|
@ -56,44 +55,43 @@ type IMAPWorker struct {
|
||||||
config imapConfig
|
config imapConfig
|
||||||
|
|
||||||
client *imapClient
|
client *imapClient
|
||||||
idleStop chan struct{}
|
|
||||||
idleDone chan error
|
|
||||||
selected *imap.MailboxStatus
|
selected *imap.MailboxStatus
|
||||||
updates chan client.Update
|
updates chan client.Update
|
||||||
worker *types.Worker
|
worker *types.Worker
|
||||||
// Map of sequence numbers to UIDs, index 0 is seq number 1
|
// Map of sequence numbers to UIDs, index 0 is seq number 1
|
||||||
seqMap []uint32
|
seqMap []uint32
|
||||||
|
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
autoReconnect bool
|
autoReconnect bool
|
||||||
retries int
|
retries int
|
||||||
|
|
||||||
|
idler *idler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
|
func NewIMAPWorker(worker *types.Worker) (types.Backend, error) {
|
||||||
return &IMAPWorker{
|
return &IMAPWorker{
|
||||||
idleDone: make(chan error),
|
|
||||||
updates: make(chan client.Update, 50),
|
updates: make(chan client.Update, 50),
|
||||||
worker: worker,
|
worker: worker,
|
||||||
selected: &imap.MailboxStatus{},
|
selected: &imap.MailboxStatus{},
|
||||||
|
idler: newIdler(imapConfig{}, worker),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
func (w *IMAPWorker) newClient(c *client.Client) {
|
||||||
if w.client != nil && w.client.State() == imap.SelectedState {
|
c.Updates = w.updates
|
||||||
close(w.idleStop)
|
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
|
||||||
if err := <-w.idleDone; err != nil {
|
w.idler.SetClient(w.client)
|
||||||
w.worker.PostMessage(&types.Error{Error: err}, nil)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if w.client != nil && w.client.State() == imap.SelectedState {
|
|
||||||
w.idleStop = make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer logging.PanicHandler()
|
|
||||||
|
|
||||||
w.idleDone <- w.client.Idle(w.idleStop, &client.IdleOptions{LogoutTimeout: 0, PollInterval: 0})
|
func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
||||||
|
defer func() {
|
||||||
|
w.idler.Start()
|
||||||
}()
|
}()
|
||||||
|
if err := w.idler.Stop(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
var reterr error // will be returned at the end, needed to support idle
|
||||||
|
|
||||||
checkConn := func(wait time.Duration) {
|
checkConn := func(wait time.Duration) {
|
||||||
time.Sleep(wait)
|
time.Sleep(wait)
|
||||||
|
@ -101,7 +99,10 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
||||||
w.startConnectionObserver()
|
w.startConnectionObserver()
|
||||||
}
|
}
|
||||||
|
|
||||||
var reterr error // will be returned at the end, needed to support idle
|
// set connection timeout for calls to imap server
|
||||||
|
if w.client != nil {
|
||||||
|
w.client.Timeout = w.config.connection_timeout
|
||||||
|
}
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *types.Unsupported:
|
case *types.Unsupported:
|
||||||
|
@ -128,8 +129,7 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
||||||
|
|
||||||
w.stopConnectionObserver()
|
w.stopConnectionObserver()
|
||||||
|
|
||||||
c.Updates = w.updates
|
w.newClient(c)
|
||||||
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
|
|
||||||
|
|
||||||
w.startConnectionObserver()
|
w.startConnectionObserver()
|
||||||
|
|
||||||
|
@ -150,8 +150,7 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
||||||
|
|
||||||
w.stopConnectionObserver()
|
w.stopConnectionObserver()
|
||||||
|
|
||||||
c.Updates = w.updates
|
w.newClient(c)
|
||||||
w.client = &imapClient{c, sortthread.NewThreadClient(c), sortthread.NewSortClient(c)}
|
|
||||||
|
|
||||||
w.startConnectionObserver()
|
w.startConnectionObserver()
|
||||||
|
|
||||||
|
@ -203,6 +202,11 @@ func (w *IMAPWorker) handleMessage(msg types.WorkerMessage) error {
|
||||||
reterr = errUnsupported
|
reterr = errUnsupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we don't want idle to timeout, so set timeout to zero
|
||||||
|
if w.client != nil {
|
||||||
|
w.client.Timeout = 0
|
||||||
|
}
|
||||||
|
|
||||||
return reterr
|
return reterr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,6 +437,7 @@ func (w *IMAPWorker) Run() {
|
||||||
select {
|
select {
|
||||||
case msg := <-w.worker.Actions:
|
case msg := <-w.worker.Actions:
|
||||||
msg = w.worker.ProcessAction(msg)
|
msg = w.worker.ProcessAction(msg)
|
||||||
|
|
||||||
if err := w.handleMessage(msg); err == errUnsupported {
|
if err := w.handleMessage(msg); err == errUnsupported {
|
||||||
w.worker.PostMessage(&types.Unsupported{
|
w.worker.PostMessage(&types.Unsupported{
|
||||||
Message: types.RespondTo(msg),
|
Message: types.RespondTo(msg),
|
||||||
|
@ -443,6 +448,7 @@ func (w *IMAPWorker) Run() {
|
||||||
Error: err,
|
Error: err,
|
||||||
}, nil)
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
case update := <-w.updates:
|
case update := <-w.updates:
|
||||||
w.handleImapUpdate(update)
|
w.handleImapUpdate(update)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue