Notmuch: use adhoc write connection.

Notmuch only allows a single write connection, all other clients trying to
modify the db block. Hence we should only open one when we actually need it.

Apparently we also need to refresh the RO DB connection upon modification,
else we get stale message tag results
This commit is contained in:
Reto Brunner 2019-08-24 11:53:43 +02:00 committed by Drew DeVault
parent 90453b5db4
commit a2c5233f71
2 changed files with 147 additions and 30 deletions

View File

@ -17,13 +17,15 @@ import (
)
type Message struct {
uid uint32
key string
msg *notmuch.Message
uid uint32
key string
msg *notmuch.Message
rwDB func() (*notmuch.DB, error) // used to open a db for writing
refresh func(*Message) error // called after msg modification
}
// NewReader reads a message into memory and returns an io.Reader for it.
func (m Message) NewReader() (io.Reader, error) {
func (m *Message) NewReader() (io.Reader, error) {
f, err := os.Open(m.msg.Filename())
if err != nil {
return nil, err
@ -37,13 +39,13 @@ func (m Message) NewReader() (io.Reader, error) {
}
// MessageInfo populates a models.MessageInfo struct for the message.
func (m Message) MessageInfo() (*models.MessageInfo, error) {
func (m *Message) MessageInfo() (*models.MessageInfo, error) {
return lib.MessageInfo(m)
}
// NewBodyPartReader creates a new io.Reader for the requested body part(s) of
// the message.
func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
func (m *Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
f, err := os.Open(m.msg.Filename())
if err != nil {
return nil, err
@ -57,7 +59,7 @@ func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
}
// MarkRead either adds or removes the maildir.FlagSeen flag from the message.
func (m Message) MarkRead(seen bool) error {
func (m *Message) MarkRead(seen bool) error {
haveUnread := false
for _, t := range m.tags() {
if t == "unread" {
@ -71,14 +73,14 @@ func (m Message) MarkRead(seen bool) error {
}
if haveUnread {
err := m.msg.RemoveTag("unread")
err := m.RemoveTag("unread")
if err != nil {
return err
}
return nil
}
err := m.msg.AddTag("unread")
err := m.AddTag("unread")
if err != nil {
return err
}
@ -86,7 +88,7 @@ func (m Message) MarkRead(seen bool) error {
}
// tags returns the notmuch tags of a message
func (m Message) tags() []string {
func (m *Message) tags() []string {
ts := m.msg.Tags()
var tags []string
var tag *notmuch.Tag
@ -96,7 +98,72 @@ func (m Message) tags() []string {
return tags
}
func (m Message) ModelFlags() ([]models.Flag, error) {
func (m *Message) modify(cb func(*notmuch.Message) error) error {
db, err := m.rwDB()
if err != nil {
return err
}
defer db.Close()
msg, err := db.FindMessage(m.key)
if err != nil {
return err
}
err = cb(msg)
if err != nil {
return err
}
// we need to explicitly close here, else we don't commit
dcerr := db.Close()
if dcerr != nil && err == nil {
err = dcerr
}
// next we need to refresh the notmuch msg, else we serve stale tags
rerr := m.refresh(m)
if rerr != nil && err == nil {
err = rerr
}
return err
}
func (m *Message) AddTag(tag string) error {
err := m.modify(func(msg *notmuch.Message) error {
return msg.AddTag(tag)
})
return err
}
func (m *Message) AddTags(tags []string) error {
err := m.modify(func(msg *notmuch.Message) error {
ierr := msg.Atomic(func(msg *notmuch.Message) {
for _, t := range tags {
msg.AddTag(t)
}
})
return ierr
})
return err
}
func (m *Message) RemoveTag(tag string) error {
err := m.modify(func(msg *notmuch.Message) error {
return msg.RemoveTag(tag)
})
return err
}
func (m *Message) RemoveTags(tags []string) error {
err := m.modify(func(msg *notmuch.Message) error {
ierr := msg.Atomic(func(msg *notmuch.Message) {
for _, t := range tags {
msg.RemoveTag(t)
}
})
return ierr
})
return err
}
func (m *Message) ModelFlags() ([]models.Flag, error) {
var flags []models.Flag
seen := true
@ -118,6 +185,6 @@ func (m Message) ModelFlags() ([]models.Flag, error) {
return flags, nil
}
func (m Message) UID() uint32 {
func (m *Message) UID() uint32 {
return m.uid
}

View File

@ -29,7 +29,7 @@ type worker struct {
w *types.Worker
pathToDB string
db *notmuch.DB
selected *notmuch.Query
query string
uidStore *uidstore.Store
excludedTags []string
nameQueryMap map[string]string
@ -129,12 +129,36 @@ func (w *worker) handleConfigure(msg *types.Configure) error {
return nil
}
func (w *worker) handleConnect(msg *types.Connect) error {
// connectRW returns a writable notmuch DB, which needs to be closed to commit
// the changes and to release the DB lock
func (w *worker) connectRW() (*notmuch.DB, error) {
db, err := notmuch.Open(w.pathToDB, notmuch.DBReadWrite)
if err != nil {
return nil, fmt.Errorf("could not connect to notmuch db: %v", err)
}
return db, err
}
// connectRO connects a RO db to the worker
func (w *worker) connectRO() error {
if w.db != nil {
if err := w.db.Close(); err != nil {
w.w.Logger.Printf("connectRO: could not close the old db: %v", err)
}
}
var err error
w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadWrite)
w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadOnly)
if err != nil {
return fmt.Errorf("could not connect to notmuch db: %v", err)
}
return nil
}
func (w *worker) handleConnect(msg *types.Connect) error {
err := w.connectRO()
if err != nil {
return err
}
w.done(msg)
return nil
}
@ -153,21 +177,32 @@ func (w *worker) handleListDirectories(msg *types.ListDirectories) error {
return nil
}
//query returns a query based on the query string on w.query.
//it also configures the query as specified on the worker
func (w *worker) getQuery() (*notmuch.Query, error) {
q := w.db.NewQuery(w.query)
q.SetExcludeScheme(notmuch.EXCLUDE_TRUE)
q.SetSortScheme(notmuch.SORT_OLDEST_FIRST)
for _, t := range w.excludedTags {
err := q.AddTagExclude(t)
if err != nil && err != notmuch.ErrIgnored {
return nil, err
}
}
return q, nil
}
func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
w.w.Logger.Printf("opening %s", msg.Directory)
// try the friendly name first, if that fails assume it's a query
query, ok := w.nameQueryMap[msg.Directory]
q, ok := w.nameQueryMap[msg.Directory]
if !ok {
query = msg.Directory
q = msg.Directory
}
w.selected = w.db.NewQuery(query)
w.selected.SetExcludeScheme(notmuch.EXCLUDE_TRUE)
w.selected.SetSortScheme(notmuch.SORT_OLDEST_FIRST)
for _, t := range w.excludedTags {
err := w.selected.AddTagExclude(t)
if err != nil && err != notmuch.ErrIgnored {
return err
}
w.query = q
query, err := w.getQuery()
if err != nil {
return err
}
//TODO: why does this need to be sent twice??
info := &types.DirectoryInfo{
@ -176,7 +211,7 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
Flags: []string{},
ReadOnly: false,
// total messages
Exists: w.selected.CountMessages(),
Exists: query.CountMessages(),
// new messages since mailbox was last opened
Recent: 0,
// total unread
@ -191,7 +226,11 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
func (w *worker) handleFetchDirectoryContents(
msg *types.FetchDirectoryContents) error {
uids, err := w.uidsFromQuery(w.selected)
q, err := w.getQuery()
if err != nil {
return err
}
uids, err := w.uidsFromQuery(q)
if err != nil {
w.w.Logger.Printf("error scanning uids: %v", err)
return err
@ -253,9 +292,20 @@ func (w *worker) msgFromUid(uid uint32) (*Message, error) {
return nil, fmt.Errorf("Could not fetch message for key %q: %v", key, err)
}
msg := &Message{
key: key,
uid: uid,
msg: nm,
key: key,
uid: uid,
msg: nm,
rwDB: w.connectRW,
refresh: func(m *Message) error {
//close the old message manually, else we segfault during gc
m.msg.Close()
err := w.connectRO()
if err != nil {
return err
}
m.msg, err = w.db.FindMessage(m.key)
return err
},
}
return msg, nil
}