aerc/worker/notmuch/worker.go
Koni Marti 0e50f29bf3 notmuch: move logic for dynamic folders to backend
Moves logic for creating dynamic folders from the dirlist widget to the
backend. Since dynamic folders are notmuch-specific, the notmuch backend
should be responsible for correctly setting up those folders. It does
that by sending two DirectoryInfos: the first to create the message
store, the second to fetch the directory content.

This approach also fixes a deadlock introduced by 716ade8968
("worker: lock access to callback maps").

Reported-by: Bence Ferdinandy <bence@ferdinandy.com>
Signed-off-by: Koni Marti <koni.marti@gmail.com>
Tested-by: Tim Culverhouse <tim@timculverhouse.com>
2022-09-29 16:52:12 +02:00

686 lines
16 KiB
Go

//go:build notmuch
// +build notmuch
package notmuch
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"git.sr.ht/~rjarry/aerc/config"
"git.sr.ht/~rjarry/aerc/logging"
"git.sr.ht/~rjarry/aerc/models"
"git.sr.ht/~rjarry/aerc/worker/handlers"
"git.sr.ht/~rjarry/aerc/worker/lib"
notmuch "git.sr.ht/~rjarry/aerc/worker/notmuch/lib"
"git.sr.ht/~rjarry/aerc/worker/types"
"github.com/mitchellh/go-homedir"
)
func init() {
handlers.RegisterWorkerFactory("notmuch", NewWorker)
}
var errUnsupported = fmt.Errorf("unsupported command")
const backgroundRefreshDelay = 1 * time.Minute
type worker struct {
w *types.Worker
nmEvents chan eventType
query string
currentQueryName string
queryMapOrder []string
nameQueryMap map[string]string
db *notmuch.DB
setupErr error
currentSortCriteria []*types.SortCriterion
}
// NewWorker creates a new notmuch worker with the provided worker.
func NewWorker(w *types.Worker) (types.Backend, error) {
events := make(chan eventType, 20)
return &worker{
w: w,
nmEvents: events,
}, nil
}
// Run starts the worker's message handling loop.
func (w *worker) Run() {
for {
select {
case action := <-w.w.Actions:
msg := w.w.ProcessAction(action)
if err := w.handleMessage(msg); errors.Is(err, errUnsupported) {
w.w.PostMessage(&types.Unsupported{
Message: types.RespondTo(msg),
}, nil)
logging.Errorf("ProcessAction(%T) unsupported: %w", msg, err)
} else if err != nil {
w.w.PostMessage(&types.Error{
Message: types.RespondTo(msg),
Error: err,
}, nil)
logging.Errorf("ProcessAction(%T) failure: %w", msg, err)
}
case nmEvent := <-w.nmEvents:
err := w.handleNotmuchEvent(nmEvent)
if err != nil {
logging.Errorf("notmuch event failure: %w", err)
}
}
}
}
func (w *worker) done(msg types.WorkerMessage) {
w.w.PostMessage(&types.Done{Message: types.RespondTo(msg)}, nil)
}
func (w *worker) err(msg types.WorkerMessage, err error) {
w.w.PostMessage(&types.Error{
Message: types.RespondTo(msg),
Error: err,
}, nil)
}
func (w *worker) handleMessage(msg types.WorkerMessage) error {
if w.setupErr != nil {
// only configure can recover from a config error, bail for everything else
_, isConfigure := msg.(*types.Configure)
if !isConfigure {
return w.setupErr
}
}
switch msg := msg.(type) {
case *types.Unsupported:
// No-op
case *types.Configure:
return w.handleConfigure(msg)
case *types.Connect:
return w.handleConnect(msg)
case *types.ListDirectories:
return w.handleListDirectories(msg)
case *types.OpenDirectory:
return w.handleOpenDirectory(msg)
case *types.FetchDirectoryContents:
return w.handleFetchDirectoryContents(msg)
case *types.FetchDirectoryThreaded:
return w.handleFetchDirectoryThreaded(msg)
case *types.FetchMessageHeaders:
return w.handleFetchMessageHeaders(msg)
case *types.FetchMessageBodyPart:
return w.handleFetchMessageBodyPart(msg)
case *types.FetchFullMessages:
return w.handleFetchFullMessages(msg)
case *types.FlagMessages:
return w.handleFlagMessages(msg)
case *types.AnsweredMessages:
return w.handleAnsweredMessages(msg)
case *types.SearchDirectory:
return w.handleSearchDirectory(msg)
case *types.ModifyLabels:
return w.handleModifyLabels(msg)
case *types.CheckMail:
go w.handleCheckMail(msg)
return nil
// not implemented, they are generally not used
// in a notmuch based workflow
// case *types.DeleteMessages:
// case *types.CopyMessages:
// return w.handleCopyMessages(msg)
// case *types.AppendMessage:
// return w.handleAppendMessage(msg)
// case *types.CreateDirectory:
// return w.handleCreateDirectory(msg)
// case *types.RemoveDirectory:
// return w.handleRemoveDirectory(msg)
}
return errUnsupported
}
func (w *worker) handleConfigure(msg *types.Configure) error {
var err error
defer func() {
if err == nil {
w.setupErr = nil
return
}
w.setupErr = fmt.Errorf("notmuch: %w", err)
}()
u, err := url.Parse(msg.Config.Source)
if err != nil {
logging.Errorf("error configuring notmuch worker: %w", err)
return err
}
home, err := homedir.Expand(u.Hostname())
if err != nil {
return fmt.Errorf("could not resolve home directory: %w", err)
}
pathToDB := filepath.Join(home, u.Path)
err = w.loadQueryMap(msg.Config)
if err != nil {
return fmt.Errorf("could not load query map configuration: %w", err)
}
excludedTags := w.loadExcludeTags(msg.Config)
w.db = notmuch.NewDB(pathToDB, excludedTags)
return nil
}
func (w *worker) handleConnect(msg *types.Connect) error {
err := w.db.Connect()
if err != nil {
return err
}
w.done(msg)
w.emitLabelList()
go func() {
defer logging.PanicHandler()
for {
w.nmEvents <- &updateDirCounts{}
time.Sleep(backgroundRefreshDelay)
}
}()
return nil
}
func (w *worker) handleListDirectories(msg *types.ListDirectories) error {
for _, name := range w.queryMapOrder {
w.w.PostMessage(&types.Directory{
Message: types.RespondTo(msg),
Dir: &models.Directory{
Name: name,
Attributes: []string{},
},
}, nil)
}
w.done(msg)
return nil
}
func (w *worker) gatherDirectoryInfo(name string, query string) (
*types.DirectoryInfo, error,
) {
return w.buildDirInfo(name, query, false)
}
func (w *worker) buildDirInfo(name string, query string, skipSort bool) (
*types.DirectoryInfo, error,
) {
count, err := w.db.QueryCountMessages(query)
if err != nil {
return nil, err
}
info := &types.DirectoryInfo{
SkipSort: skipSort,
Info: &models.DirectoryInfo{
Name: name,
Flags: []string{},
ReadOnly: false,
// total messages
Exists: count.Exists,
// new messages since mailbox was last opened
Recent: 0,
// total unread
Unseen: count.Unread,
AccurateCounts: true,
Caps: &models.Capabilities{
Sort: true,
Thread: true,
},
},
}
return info, nil
}
func (w *worker) emitDirectoryInfo(name string) error {
query, _ := w.queryFromName(name)
info, err := w.gatherDirectoryInfo(name, query)
if err != nil {
return err
}
w.w.PostMessage(info, nil)
return nil
}
// queryFromName either returns the friendly ID if aliased or the name itself
// assuming it to be the query
func (w *worker) queryFromName(name string) (string, bool) {
// try the friendly name first, if that fails assume it's a query
q, ok := w.nameQueryMap[name]
if !ok {
return name, true
}
return q, false
}
func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
logging.Infof("opening %s", msg.Directory)
// try the friendly name first, if that fails assume it's a query
var isQuery bool
w.query, isQuery = w.queryFromName(msg.Directory)
w.currentQueryName = msg.Directory
info, err := w.gatherDirectoryInfo(msg.Directory, w.query)
if err != nil {
return err
}
info.Message = types.RespondTo(msg)
w.w.PostMessage(info, nil)
if isQuery {
w.w.PostMessage(info, nil)
}
w.done(msg)
return nil
}
func (w *worker) handleFetchDirectoryContents(
msg *types.FetchDirectoryContents,
) error {
w.currentSortCriteria = msg.SortCriteria
err := w.emitDirectoryContents(msg)
if err != nil {
return err
}
w.done(msg)
return nil
}
func (w *worker) handleFetchDirectoryThreaded(
msg *types.FetchDirectoryThreaded,
) error {
// w.currentSortCriteria = msg.SortCriteria
err := w.emitDirectoryThreaded(msg)
if err != nil {
return err
}
w.done(msg)
return nil
}
func (w *worker) handleFetchMessageHeaders(
msg *types.FetchMessageHeaders,
) error {
for _, uid := range msg.Uids {
m, err := w.msgFromUid(uid)
if err != nil {
logging.Errorf("could not get message: %w", err)
w.w.PostMessageInfoError(msg, uid, err)
continue
}
err = w.emitMessageInfo(m, msg)
if err != nil {
logging.Errorf("could not emit message info: %w", err)
w.w.PostMessageInfoError(msg, uid, err)
continue
}
}
w.done(msg)
return nil
}
func (w *worker) uidsFromQuery(query string) ([]uint32, error) {
msgIDs, err := w.db.MsgIDsFromQuery(query)
if err != nil {
return nil, err
}
var uids []uint32
for _, id := range msgIDs {
uid := w.db.UidFromKey(id)
uids = append(uids, uid)
}
return uids, nil
}
func (w *worker) msgFromUid(uid uint32) (*Message, error) {
key, ok := w.db.KeyFromUid(uid)
if !ok {
return nil, fmt.Errorf("Invalid uid: %v", uid)
}
msg := &Message{
key: key,
uid: uid,
db: w.db,
}
return msg, nil
}
func (w *worker) handleFetchMessageBodyPart(
msg *types.FetchMessageBodyPart,
) error {
m, err := w.msgFromUid(msg.Uid)
if err != nil {
logging.Errorf("could not get message %d: %w", msg.Uid, err)
return err
}
r, err := m.NewBodyPartReader(msg.Part)
if err != nil {
logging.Errorf(
"could not get body part reader for message=%d, parts=%#v: %w",
msg.Uid, msg.Part, err)
return err
}
w.w.PostMessage(&types.MessageBodyPart{
Message: types.RespondTo(msg),
Part: &models.MessageBodyPart{
Reader: r,
Uid: msg.Uid,
},
}, nil)
w.done(msg)
return nil
}
func (w *worker) handleFetchFullMessages(msg *types.FetchFullMessages) error {
for _, uid := range msg.Uids {
m, err := w.msgFromUid(uid)
if err != nil {
logging.Errorf("could not get message %d: %w", uid, err)
return err
}
r, err := m.NewReader()
if err != nil {
logging.Errorf("could not get message reader: %w", err)
return err
}
defer r.Close()
b, err := io.ReadAll(r)
if err != nil {
return err
}
w.w.PostMessage(&types.FullMessage{
Message: types.RespondTo(msg),
Content: &models.FullMessage{
Uid: uid,
Reader: bytes.NewReader(b),
},
}, nil)
}
w.done(msg)
return nil
}
func (w *worker) handleAnsweredMessages(msg *types.AnsweredMessages) error {
for _, uid := range msg.Uids {
m, err := w.msgFromUid(uid)
if err != nil {
logging.Errorf("could not get message: %w", err)
w.err(msg, err)
continue
}
if err := m.MarkAnswered(msg.Answered); err != nil {
logging.Errorf("could not mark message as answered: %w", err)
w.err(msg, err)
continue
}
err = w.emitMessageInfo(m, msg)
if err != nil {
logging.Errorf("could not emit message info: %w", err)
w.err(msg, err)
continue
}
}
if err := w.emitDirectoryInfo(w.currentQueryName); err != nil {
logging.Errorf("could not emit directory info: %w", err)
}
w.done(msg)
return nil
}
func (w *worker) handleFlagMessages(msg *types.FlagMessages) error {
for _, uid := range msg.Uids {
m, err := w.msgFromUid(uid)
if err != nil {
logging.Errorf("could not get message: %w", err)
w.err(msg, err)
continue
}
if err := m.SetFlag(msg.Flag, msg.Enable); err != nil {
logging.Errorf("could not set flag %v as %t for message: %w", msg.Flag, msg.Enable, err)
w.err(msg, err)
continue
}
err = w.emitMessageInfo(m, msg)
if err != nil {
logging.Errorf("could not emit message info: %w", err)
w.err(msg, err)
continue
}
}
if err := w.emitDirectoryInfo(w.currentQueryName); err != nil {
logging.Errorf("could not emit directory info: %w", err)
}
w.done(msg)
return nil
}
func (w *worker) handleSearchDirectory(msg *types.SearchDirectory) error {
// the first item is the command (search / filter)
s := strings.Join(msg.Argv[1:], " ")
// we only want to search in the current query, so merge the two together
search := w.query
if s != "" {
search = fmt.Sprintf("(%v) and (%v)", w.query, s)
}
uids, err := w.uidsFromQuery(search)
if err != nil {
return err
}
w.w.PostMessage(&types.SearchResults{
Message: types.RespondTo(msg),
Uids: uids,
}, nil)
return nil
}
func (w *worker) handleModifyLabels(msg *types.ModifyLabels) error {
for _, uid := range msg.Uids {
m, err := w.msgFromUid(uid)
if err != nil {
return fmt.Errorf("could not get message from uid %d: %w", uid, err)
}
err = m.ModifyTags(msg.Add, msg.Remove)
if err != nil {
return fmt.Errorf("could not modify message tags: %w", err)
}
err = w.emitMessageInfo(m, msg)
if err != nil {
return err
}
}
// tags changed, most probably some messages shifted to other folders
// so we need to re-enumerate the query content
err := w.emitDirectoryContents(msg)
if err != nil {
return err
}
// and update the list of possible tags
w.emitLabelList()
if err = w.emitDirectoryInfo(w.currentQueryName); err != nil {
logging.Errorf("could not emit directory info: %w", err)
}
w.done(msg)
return nil
}
func (w *worker) loadQueryMap(acctConfig *config.AccountConfig) error {
raw, ok := acctConfig.Params["query-map"]
if !ok {
// nothing to do
return nil
}
file, err := homedir.Expand(raw)
if err != nil {
return err
}
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
w.nameQueryMap = make(map[string]string)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if line == "" || line[0] == '#' {
continue
}
split := strings.SplitN(line, "=", 2)
if len(split) != 2 {
return fmt.Errorf("%v: invalid line %q, want name=query", file, line)
}
w.nameQueryMap[split[0]] = split[1]
w.queryMapOrder = append(w.queryMapOrder, split[0])
}
return nil
}
func (w *worker) loadExcludeTags(
acctConfig *config.AccountConfig,
) []string {
raw, ok := acctConfig.Params["exclude-tags"]
if !ok {
// nothing to do
return nil
}
excludedTags := strings.Split(raw, ",")
for idx, tag := range excludedTags {
excludedTags[idx] = strings.Trim(tag, " ")
}
return excludedTags
}
func (w *worker) emitDirectoryContents(parent types.WorkerMessage) error {
query := w.query
if msg, ok := parent.(*types.FetchDirectoryContents); ok {
s := strings.Join(msg.FilterCriteria[1:], " ")
if s != "" {
query = fmt.Sprintf("(%v) and (%v)", query, s)
}
}
uids, err := w.uidsFromQuery(query)
if err != nil {
return fmt.Errorf("could not fetch uids: %w", err)
}
sortedUids, err := w.sort(uids, w.currentSortCriteria)
if err != nil {
logging.Errorf("error sorting directory: %w", err)
return err
}
w.w.PostMessage(&types.DirectoryContents{
Message: types.RespondTo(parent),
Uids: sortedUids,
}, nil)
return nil
}
func (w *worker) emitDirectoryThreaded(parent types.WorkerMessage) error {
query := w.query
if msg, ok := parent.(*types.FetchDirectoryThreaded); ok {
s := strings.Join(msg.FilterCriteria[1:], " ")
if s != "" {
query = fmt.Sprintf("(%v) and (%v)", query, s)
}
}
threads, err := w.db.ThreadsFromQuery(query)
if err != nil {
return err
}
w.w.PostMessage(&types.DirectoryThreaded{
Threads: threads,
}, nil)
return nil
}
func (w *worker) emitMessageInfo(m *Message,
parent types.WorkerMessage,
) error {
info, err := m.MessageInfo()
if err != nil {
return fmt.Errorf("could not get MessageInfo: %w", err)
}
w.w.PostMessage(&types.MessageInfo{
Message: types.RespondTo(parent),
Info: info,
}, nil)
return nil
}
func (w *worker) emitLabelList() {
tags, err := w.db.ListTags()
if err != nil {
logging.Errorf("could not load tags: %w", err)
return
}
w.w.PostMessage(&types.LabelList{Labels: tags}, nil)
}
func (w *worker) sort(uids []uint32,
criteria []*types.SortCriterion,
) ([]uint32, error) {
if len(criteria) == 0 {
return uids, nil
}
var msgInfos []*models.MessageInfo
for _, uid := range uids {
m, err := w.msgFromUid(uid)
if err != nil {
logging.Errorf("could not get message: %w", err)
continue
}
info, err := m.MessageInfo()
if err != nil {
logging.Errorf("could not get message info: %w", err)
continue
}
msgInfos = append(msgInfos, info)
}
sortedUids, err := lib.Sort(msgInfos, criteria)
if err != nil {
logging.Errorf("could not sort the messages: %w", err)
return nil, err
}
return sortedUids, nil
}
func (w *worker) handleCheckMail(msg *types.CheckMail) {
if msg.Command == "" {
w.err(msg, fmt.Errorf("checkmail: no command specified"))
return
}
ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout)
defer cancel()
cmd := exec.CommandContext(ctx, "sh", "-c", msg.Command)
ch := make(chan error)
go func() {
err := cmd.Run()
ch <- err
}()
select {
case <-ctx.Done():
w.err(msg, fmt.Errorf("checkmail: timed out"))
case err := <-ch:
if err != nil {
w.err(msg, fmt.Errorf("checkmail: error running command: %w", err))
} else {
w.done(msg)
}
}
}