imap: send message info updates for bulk flag ops
Send message info updates back to to ui instead of posting a fetch header action to the worker when performing a bulk flag operation. This prevents the worker channels from filling up which can result in a deadlock. Signed-off-by: Koni Marti <koni.marti@gmail.com> Tested-by: Tim Culverhouse <tim@timculverhouse.com>
This commit is contained in:
parent
27978a859b
commit
75fc42e270
1 changed files with 59 additions and 41 deletions
|
@ -1,11 +1,10 @@
|
||||||
package imap
|
package imap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/emersion/go-imap"
|
"github.com/emersion/go-imap"
|
||||||
|
|
||||||
"git.sr.ht/~rjarry/aerc/logging"
|
"git.sr.ht/~rjarry/aerc/logging"
|
||||||
|
"git.sr.ht/~rjarry/aerc/models"
|
||||||
"git.sr.ht/~rjarry/aerc/worker/types"
|
"git.sr.ht/~rjarry/aerc/worker/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,29 +36,16 @@ func (imapw *IMAPWorker) handleAnsweredMessages(msg *types.AnsweredMessages) {
|
||||||
item = imap.FormatFlagsOp(imap.RemoveFlags, true)
|
item = imap.FormatFlagsOp(imap.RemoveFlags, true)
|
||||||
flags = []interface{}{imap.AnsweredFlag}
|
flags = []interface{}{imap.AnsweredFlag}
|
||||||
}
|
}
|
||||||
uids := toSeqSet(msg.Uids)
|
imapw.handleStoreOps(msg, msg.Uids, item, flags,
|
||||||
emitErr := func(err error) {
|
func(_msg *imap.Message) error {
|
||||||
imapw.worker.PostMessage(&types.Error{
|
imapw.worker.PostMessage(&types.MessageInfo{
|
||||||
Message: types.RespondTo(msg),
|
Message: types.RespondTo(msg),
|
||||||
Error: err,
|
Info: &models.MessageInfo{
|
||||||
|
Flags: translateImapFlags(_msg.Flags),
|
||||||
|
Uid: _msg.Uid,
|
||||||
|
},
|
||||||
}, nil)
|
}, nil)
|
||||||
}
|
return nil
|
||||||
if err := imapw.client.UidStore(uids, item, flags, nil); err != nil {
|
|
||||||
emitErr(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Post in a separate goroutine to prevent deadlocking
|
|
||||||
go imapw.worker.PostAction(&types.FetchMessageHeaders{
|
|
||||||
Uids: msg.Uids,
|
|
||||||
}, func(_msg types.WorkerMessage) {
|
|
||||||
switch m := _msg.(type) {
|
|
||||||
case *types.Error:
|
|
||||||
err := fmt.Errorf("handleAnsweredMessages: %w", m.Error)
|
|
||||||
logging.Errorf("could not fetch headers: %v", err)
|
|
||||||
emitErr(err)
|
|
||||||
case *types.Done:
|
|
||||||
imapw.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,28 +55,60 @@ func (imapw *IMAPWorker) handleFlagMessages(msg *types.FlagMessages) {
|
||||||
if !msg.Enable {
|
if !msg.Enable {
|
||||||
item = imap.FormatFlagsOp(imap.RemoveFlags, true)
|
item = imap.FormatFlagsOp(imap.RemoveFlags, true)
|
||||||
}
|
}
|
||||||
uids := toSeqSet(msg.Uids)
|
imapw.handleStoreOps(msg, msg.Uids, item, flags,
|
||||||
|
func(_msg *imap.Message) error {
|
||||||
|
imapw.worker.PostMessage(&types.MessageInfo{
|
||||||
|
Message: types.RespondTo(msg),
|
||||||
|
Info: &models.MessageInfo{
|
||||||
|
Flags: translateImapFlags(_msg.Flags),
|
||||||
|
Uid: _msg.Uid,
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (imapw *IMAPWorker) handleStoreOps(
|
||||||
|
msg types.WorkerMessage, uids []uint32, item imap.StoreItem, flag interface{},
|
||||||
|
procFunc func(*imap.Message) error,
|
||||||
|
) {
|
||||||
|
messages := make(chan *imap.Message)
|
||||||
|
done := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer logging.PanicHandler()
|
||||||
|
|
||||||
|
var reterr error
|
||||||
|
for _msg := range messages {
|
||||||
|
err := procFunc(_msg)
|
||||||
|
if err != nil {
|
||||||
|
if reterr == nil {
|
||||||
|
reterr = err
|
||||||
|
}
|
||||||
|
// drain the channel upon error
|
||||||
|
for range messages {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done <- reterr
|
||||||
|
}()
|
||||||
|
|
||||||
emitErr := func(err error) {
|
emitErr := func(err error) {
|
||||||
imapw.worker.PostMessage(&types.Error{
|
imapw.worker.PostMessage(&types.Error{
|
||||||
Message: types.RespondTo(msg),
|
Message: types.RespondTo(msg),
|
||||||
Error: err,
|
Error: err,
|
||||||
}, nil)
|
}, nil)
|
||||||
}
|
}
|
||||||
if err := imapw.client.UidStore(uids, item, flags, nil); err != nil {
|
|
||||||
|
set := toSeqSet(uids)
|
||||||
|
if err := imapw.client.UidStore(set, item, flag, messages); err != nil {
|
||||||
emitErr(err)
|
emitErr(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Post in a separate goroutine to prevent deadlocking
|
if err := <-done; err != nil {
|
||||||
go imapw.worker.PostAction(&types.FetchMessageHeaders{
|
|
||||||
Uids: msg.Uids,
|
|
||||||
}, func(_msg types.WorkerMessage) {
|
|
||||||
switch m := _msg.(type) {
|
|
||||||
case *types.Error:
|
|
||||||
err := fmt.Errorf("handleFlagMessages: %w", m.Error)
|
|
||||||
logging.Errorf("could not fetch headers: %v", err)
|
|
||||||
emitErr(err)
|
emitErr(err)
|
||||||
case *types.Done:
|
return
|
||||||
imapw.worker.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
|
|
||||||
}
|
}
|
||||||
})
|
imapw.worker.PostMessage(
|
||||||
|
&types.Done{Message: types.RespondTo(msg)}, nil)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue