notmuch: avoid stale DBs

Opening a notmuch DB gives you a snapshot of the stage at that specific time.
Prior to this, we only reopened the DB upon writing.
However, if say a mail sync program like offlineimap is fetching new mail,
we would never pick it up.

This commit caches a db for a while, so that we don't generate too much overhead
and does a reconnect cycle after that.

I hardcoded a value as I don't think that having an option would be beneficial.
Any write operation (meaning reading mail) anyhow flushes the DB by necessity.
(we need to close to commit tag changes, which changing the read state is)
This commit is contained in:
Reto Brunner 2020-02-15 09:29:54 +01:00 committed by Drew DeVault
parent 72f55b857b
commit 3e7e236f50
1 changed files with 133 additions and 99 deletions

View File

@ -5,15 +5,19 @@ package lib
import ( import (
"fmt" "fmt"
"log" "log"
"time"
notmuch "github.com/zenhack/go.notmuch" notmuch "github.com/zenhack/go.notmuch"
) )
const MAX_DB_AGE time.Duration = 10 * time.Second
type DB struct { type DB struct {
path string path string
excludedTags []string excludedTags []string
ro *notmuch.DB
logger *log.Logger logger *log.Logger
lastOpenTime time.Time
db *notmuch.DB
} }
func NewDB(path string, excludedTags []string, func NewDB(path string, excludedTags []string,
@ -27,58 +31,79 @@ func NewDB(path string, excludedTags []string,
} }
func (db *DB) Connect() error { func (db *DB) Connect() error {
return db.connectRO() // used as sanity check upon initial connect
err := db.connect(false)
return err
} }
// connectRW returns a writable notmuch DB, which needs to be closed to commit func (db *DB) close() error {
// the changes and to release the DB lock if db.db == nil {
func (db *DB) connectRW() (*notmuch.DB, error) { return nil
rw, err := notmuch.Open(db.path, notmuch.DBReadWrite)
if err != nil {
return nil, fmt.Errorf("could not connect to notmuch db: %v", err)
} }
return rw, err err := db.db.Close()
db.db = nil
return err
} }
// connectRO connects a RO db to the worker func (db *DB) connect(writable bool) error {
func (db *DB) connectRO() error { var mode notmuch.DBMode = notmuch.DBReadOnly
if db.ro != nil { if writable {
if err := db.ro.Close(); err != nil { mode = notmuch.DBReadWrite
db.logger.Printf("connectRO: could not close the old db: %v", err)
}
} }
var err error var err error
db.ro, err = notmuch.Open(db.path, notmuch.DBReadOnly) db.db, err = notmuch.Open(db.path, mode)
if err != nil { if err != nil {
return fmt.Errorf("could not connect to notmuch db: %v", err) return fmt.Errorf("could not connect to notmuch db: %v", err)
} }
db.lastOpenTime = time.Now()
return nil return nil
} }
//withConnection calls callback on the DB object, cleaning up upon return.
//the error returned is from the connection attempt, if not successful,
//or from the callback otherwise.
func (db *DB) withConnection(writable bool, cb func(*notmuch.DB) error) error {
too_old := time.Now().After(db.lastOpenTime.Add(MAX_DB_AGE))
if db.db == nil || writable || too_old {
if cerr := db.close(); cerr != nil {
db.logger.Printf("failed to close the notmuch db: %v", cerr)
}
err := db.connect(writable)
if err != nil {
return err
}
}
err := cb(db.db)
if writable {
// we need to close to commit the changes, else we block others
if cerr := db.close(); cerr != nil {
db.logger.Printf("failed to close the notmuch db: %v", cerr)
}
}
return err
}
// ListTags lists all known tags // ListTags lists all known tags
func (db *DB) ListTags() ([]string, error) { func (db *DB) ListTags() ([]string, error) {
if db.ro == nil {
return nil, fmt.Errorf("not connected to the notmuch db")
}
tags, err := db.ro.Tags()
if err != nil {
return nil, err
}
var result []string var result []string
var tag *notmuch.Tag err := db.withConnection(false, func(ndb *notmuch.DB) error {
for tags.Next(&tag) { tags, err := ndb.Tags()
result = append(result, tag.Value) if err != nil {
} return err
return result, nil }
var tag *notmuch.Tag
for tags.Next(&tag) {
result = append(result, tag.Value)
}
return nil
})
return result, err
} }
//getQuery returns a query based on the provided query string. //getQuery returns a query based on the provided query string.
//It also configures the query as specified on the worker //It also configures the query as specified on the worker
func (db *DB) newQuery(query string) (*notmuch.Query, error) { func (db *DB) newQuery(ndb *notmuch.DB, query string) (*notmuch.Query, error) {
if db.ro == nil { q := ndb.NewQuery(query)
return nil, fmt.Errorf("not connected to the notmuch db")
}
q := db.ro.NewQuery(query)
q.SetExcludeScheme(notmuch.EXCLUDE_TRUE) q.SetExcludeScheme(notmuch.EXCLUDE_TRUE)
q.SetSortScheme(notmuch.SORT_OLDEST_FIRST) q.SetSortScheme(notmuch.SORT_OLDEST_FIRST)
for _, t := range db.excludedTags { for _, t := range db.excludedTags {
@ -91,23 +116,23 @@ func (db *DB) newQuery(query string) (*notmuch.Query, error) {
} }
func (db *DB) MsgIDsFromQuery(q string) ([]string, error) { func (db *DB) MsgIDsFromQuery(q string) ([]string, error) {
if db.ro == nil {
return nil, fmt.Errorf("not connected to the notmuch db")
}
query, err := db.newQuery(q)
if err != nil {
return nil, err
}
msgs, err := query.Messages()
if err != nil {
return nil, err
}
var msg *notmuch.Message
var msgIDs []string var msgIDs []string
for msgs.Next(&msg) { err := db.withConnection(false, func(ndb *notmuch.DB) error {
msgIDs = append(msgIDs, msg.ID()) query, err := db.newQuery(ndb, q)
} if err != nil {
return msgIDs, nil return err
}
msgs, err := query.Messages()
if err != nil {
return err
}
var msg *notmuch.Message
for msgs.Next(&msg) {
msgIDs = append(msgIDs, msg.ID())
}
return nil
})
return msgIDs, err
} }
type MessageCount struct { type MessageCount struct {
@ -116,71 +141,80 @@ type MessageCount struct {
} }
func (db *DB) QueryCountMessages(q string) (MessageCount, error) { func (db *DB) QueryCountMessages(q string) (MessageCount, error) {
query, err := db.newQuery(q) var (
if err != nil { exists int
return MessageCount{}, err unread int
} )
exists := query.CountMessages() err := db.withConnection(false, func(ndb *notmuch.DB) error {
query.Close() query, err := db.newQuery(ndb, q)
uq, err := db.newQuery(fmt.Sprintf("(%v) and (tag:unread)", q)) if err != nil {
if err != nil { return err
return MessageCount{}, err }
} exists = query.CountMessages()
defer uq.Close() query.Close()
unread := uq.CountMessages() uq, err := db.newQuery(ndb, fmt.Sprintf("(%v) and (tag:unread)", q))
if err != nil {
return err
}
defer uq.Close()
unread = uq.CountMessages()
return nil
})
return MessageCount{ return MessageCount{
Exists: exists, Exists: exists,
Unread: unread, Unread: unread,
}, nil }, err
} }
func (db *DB) MsgFilename(key string) (string, error) { func (db *DB) MsgFilename(key string) (string, error) {
msg, err := db.ro.FindMessage(key) var filename string
if err != nil { err := db.withConnection(false, func(ndb *notmuch.DB) error {
return "", err msg, err := ndb.FindMessage(key)
} if err != nil {
defer msg.Close() return err
return msg.Filename(), nil }
defer msg.Close()
filename = msg.Filename()
return nil
})
return filename, err
} }
func (db *DB) MsgTags(key string) ([]string, error) { func (db *DB) MsgTags(key string) ([]string, error) {
msg, err := db.ro.FindMessage(key)
if err != nil {
return nil, err
}
defer msg.Close()
ts := msg.Tags()
var tags []string var tags []string
var tag *notmuch.Tag err := db.withConnection(false, func(ndb *notmuch.DB) error {
for ts.Next(&tag) { msg, err := ndb.FindMessage(key)
tags = append(tags, tag.Value) if err != nil {
} return err
return tags, nil }
defer msg.Close()
ts := msg.Tags()
var tag *notmuch.Tag
for ts.Next(&tag) {
tags = append(tags, tag.Value)
}
return nil
})
return tags, err
} }
func (db *DB) msgModify(key string, func (db *DB) msgModify(key string,
cb func(*notmuch.Message) error) error { cb func(*notmuch.Message) error) error {
defer db.connectRO() err := db.withConnection(true, func(ndb *notmuch.DB) error {
db.ro.Close() msg, err := ndb.FindMessage(key)
if err != nil {
return err
}
defer msg.Close()
rw, err := db.connectRW() cb(msg)
if err != nil { err = msg.TagsToMaildirFlags()
return err if err != nil {
} db.logger.Printf("could not sync maildir flags: %v", err)
defer rw.Close() }
return nil
msg, err := rw.FindMessage(key) })
if err != nil { return err
return err
}
defer msg.Close()
cb(msg)
err = msg.TagsToMaildirFlags()
if err != nil {
db.logger.Printf("could not sync maildir flags: %v", err)
}
return nil
} }
func (db *DB) MsgModifyTags(key string, add, remove []string) error { func (db *DB) MsgModifyTags(key string, add, remove []string) error {