diff --git a/worker/notmuch/message.go b/worker/notmuch/message.go index 077fb92..aa16cee 100644 --- a/worker/notmuch/message.go +++ b/worker/notmuch/message.go @@ -17,13 +17,15 @@ import ( ) type Message struct { - uid uint32 - key string - msg *notmuch.Message + uid uint32 + key string + msg *notmuch.Message + rwDB func() (*notmuch.DB, error) // used to open a db for writing + refresh func(*Message) error // called after msg modification } // NewReader reads a message into memory and returns an io.Reader for it. -func (m Message) NewReader() (io.Reader, error) { +func (m *Message) NewReader() (io.Reader, error) { f, err := os.Open(m.msg.Filename()) if err != nil { return nil, err @@ -37,13 +39,13 @@ func (m Message) NewReader() (io.Reader, error) { } // MessageInfo populates a models.MessageInfo struct for the message. -func (m Message) MessageInfo() (*models.MessageInfo, error) { +func (m *Message) MessageInfo() (*models.MessageInfo, error) { return lib.MessageInfo(m) } // NewBodyPartReader creates a new io.Reader for the requested body part(s) of // the message. -func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) { +func (m *Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) { f, err := os.Open(m.msg.Filename()) if err != nil { return nil, err @@ -57,7 +59,7 @@ func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) { } // MarkRead either adds or removes the maildir.FlagSeen flag from the message. -func (m Message) MarkRead(seen bool) error { +func (m *Message) MarkRead(seen bool) error { haveUnread := false for _, t := range m.tags() { if t == "unread" { @@ -71,14 +73,14 @@ func (m Message) MarkRead(seen bool) error { } if haveUnread { - err := m.msg.RemoveTag("unread") + err := m.RemoveTag("unread") if err != nil { return err } return nil } - err := m.msg.AddTag("unread") + err := m.AddTag("unread") if err != nil { return err } @@ -86,7 +88,7 @@ func (m Message) MarkRead(seen bool) error { } // tags returns the notmuch tags of a message -func (m Message) tags() []string { +func (m *Message) tags() []string { ts := m.msg.Tags() var tags []string var tag *notmuch.Tag @@ -96,7 +98,72 @@ func (m Message) tags() []string { return tags } -func (m Message) ModelFlags() ([]models.Flag, error) { +func (m *Message) modify(cb func(*notmuch.Message) error) error { + db, err := m.rwDB() + if err != nil { + return err + } + defer db.Close() + msg, err := db.FindMessage(m.key) + if err != nil { + return err + } + err = cb(msg) + if err != nil { + return err + } + // we need to explicitly close here, else we don't commit + dcerr := db.Close() + if dcerr != nil && err == nil { + err = dcerr + } + // next we need to refresh the notmuch msg, else we serve stale tags + rerr := m.refresh(m) + if rerr != nil && err == nil { + err = rerr + } + return err +} + +func (m *Message) AddTag(tag string) error { + err := m.modify(func(msg *notmuch.Message) error { + return msg.AddTag(tag) + }) + return err +} + +func (m *Message) AddTags(tags []string) error { + err := m.modify(func(msg *notmuch.Message) error { + ierr := msg.Atomic(func(msg *notmuch.Message) { + for _, t := range tags { + msg.AddTag(t) + } + }) + return ierr + }) + return err +} + +func (m *Message) RemoveTag(tag string) error { + err := m.modify(func(msg *notmuch.Message) error { + return msg.RemoveTag(tag) + }) + return err +} + +func (m *Message) RemoveTags(tags []string) error { + err := m.modify(func(msg *notmuch.Message) error { + ierr := msg.Atomic(func(msg *notmuch.Message) { + for _, t := range tags { + msg.RemoveTag(t) + } + }) + return ierr + }) + return err +} + +func (m *Message) ModelFlags() ([]models.Flag, error) { var flags []models.Flag seen := true @@ -118,6 +185,6 @@ func (m Message) ModelFlags() ([]models.Flag, error) { return flags, nil } -func (m Message) UID() uint32 { +func (m *Message) UID() uint32 { return m.uid } diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go index 6187b24..d4c196c 100644 --- a/worker/notmuch/worker.go +++ b/worker/notmuch/worker.go @@ -29,7 +29,7 @@ type worker struct { w *types.Worker pathToDB string db *notmuch.DB - selected *notmuch.Query + query string uidStore *uidstore.Store excludedTags []string nameQueryMap map[string]string @@ -129,12 +129,36 @@ func (w *worker) handleConfigure(msg *types.Configure) error { return nil } -func (w *worker) handleConnect(msg *types.Connect) error { +// connectRW returns a writable notmuch DB, which needs to be closed to commit +// the changes and to release the DB lock +func (w *worker) connectRW() (*notmuch.DB, error) { + db, err := notmuch.Open(w.pathToDB, notmuch.DBReadWrite) + if err != nil { + return nil, fmt.Errorf("could not connect to notmuch db: %v", err) + } + return db, err +} + +// connectRO connects a RO db to the worker +func (w *worker) connectRO() error { + if w.db != nil { + if err := w.db.Close(); err != nil { + w.w.Logger.Printf("connectRO: could not close the old db: %v", err) + } + } var err error - w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadWrite) + w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadOnly) if err != nil { return fmt.Errorf("could not connect to notmuch db: %v", err) } + return nil +} + +func (w *worker) handleConnect(msg *types.Connect) error { + err := w.connectRO() + if err != nil { + return err + } w.done(msg) return nil } @@ -153,21 +177,32 @@ func (w *worker) handleListDirectories(msg *types.ListDirectories) error { return nil } +//query returns a query based on the query string on w.query. +//it also configures the query as specified on the worker +func (w *worker) getQuery() (*notmuch.Query, error) { + q := w.db.NewQuery(w.query) + q.SetExcludeScheme(notmuch.EXCLUDE_TRUE) + q.SetSortScheme(notmuch.SORT_OLDEST_FIRST) + for _, t := range w.excludedTags { + err := q.AddTagExclude(t) + if err != nil && err != notmuch.ErrIgnored { + return nil, err + } + } + return q, nil +} + func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error { w.w.Logger.Printf("opening %s", msg.Directory) // try the friendly name first, if that fails assume it's a query - query, ok := w.nameQueryMap[msg.Directory] + q, ok := w.nameQueryMap[msg.Directory] if !ok { - query = msg.Directory + q = msg.Directory } - w.selected = w.db.NewQuery(query) - w.selected.SetExcludeScheme(notmuch.EXCLUDE_TRUE) - w.selected.SetSortScheme(notmuch.SORT_OLDEST_FIRST) - for _, t := range w.excludedTags { - err := w.selected.AddTagExclude(t) - if err != nil && err != notmuch.ErrIgnored { - return err - } + w.query = q + query, err := w.getQuery() + if err != nil { + return err } //TODO: why does this need to be sent twice?? info := &types.DirectoryInfo{ @@ -176,7 +211,7 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error { Flags: []string{}, ReadOnly: false, // total messages - Exists: w.selected.CountMessages(), + Exists: query.CountMessages(), // new messages since mailbox was last opened Recent: 0, // total unread @@ -191,7 +226,11 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error { func (w *worker) handleFetchDirectoryContents( msg *types.FetchDirectoryContents) error { - uids, err := w.uidsFromQuery(w.selected) + q, err := w.getQuery() + if err != nil { + return err + } + uids, err := w.uidsFromQuery(q) if err != nil { w.w.Logger.Printf("error scanning uids: %v", err) return err @@ -253,9 +292,20 @@ func (w *worker) msgFromUid(uid uint32) (*Message, error) { return nil, fmt.Errorf("Could not fetch message for key %q: %v", key, err) } msg := &Message{ - key: key, - uid: uid, - msg: nm, + key: key, + uid: uid, + msg: nm, + rwDB: w.connectRW, + refresh: func(m *Message) error { + //close the old message manually, else we segfault during gc + m.msg.Close() + err := w.connectRO() + if err != nil { + return err + } + m.msg, err = w.db.FindMessage(m.key) + return err + }, } return msg, nil }