diff --git a/worker/imap/fetch.go b/worker/imap/fetch.go index b6a5509..41269cb 100644 --- a/worker/imap/fetch.go +++ b/worker/imap/fetch.go @@ -216,7 +216,6 @@ func (imapw *IMAPWorker) handleFetchMessages( var reterr error for _msg := range messages { - imapw.seqMap.Put(_msg.SeqNum, _msg.Uid) err := procFunc(_msg) if err != nil { if reterr == nil { diff --git a/worker/imap/open.go b/worker/imap/open.go index 7abebea..15cfa31 100644 --- a/worker/imap/open.go +++ b/worker/imap/open.go @@ -65,7 +65,10 @@ func (imapw *IMAPWorker) handleFetchDirectoryContents( }, nil) } else { logging.Infof("Found %d UIDs", len(uids)) - imapw.seqMap.Clear() + if len(msg.FilterCriteria) == 1 { + // Only initialize if we are not filtering + imapw.seqMap.Initialize(uids) + } imapw.worker.PostMessage(&types.DirectoryContents{ Message: types.RespondTo(msg), Uids: uids, @@ -123,7 +126,17 @@ func (imapw *IMAPWorker) handleDirectoryThreaded( aercThreads, count := convertThreads(threads, nil) sort.Sort(types.ByUID(aercThreads)) logging.Infof("Found %d threaded messages", count) - imapw.seqMap.Clear() + if len(msg.FilterCriteria) == 1 { + // Only initialize if we are not filtering + var uids []uint32 + for i := len(aercThreads) - 1; i >= 0; i-- { + aercThreads[i].Walk(func(t *types.Thread, level int, currentErr error) error { + uids = append(uids, t.Uid) + return nil + }) + } + imapw.seqMap.Initialize(uids) + } imapw.worker.PostMessage(&types.DirectoryThreaded{ Message: types.RespondTo(msg), Threads: aercThreads, diff --git a/worker/imap/seqmap.go b/worker/imap/seqmap.go index 093bbc5..3558fe4 100644 --- a/worker/imap/seqmap.go +++ b/worker/imap/seqmap.go @@ -1,13 +1,22 @@ package imap import ( + "sort" "sync" ) type SeqMap struct { lock sync.Mutex // map of IMAP sequence numbers to message UIDs - m map[uint32]uint32 + m []uint32 +} + +// Initialize sets the initial seqmap of the mailbox +func (s *SeqMap) Initialize(uids []uint32) { + s.lock.Lock() + s.m = uids + s.sort() + s.lock.Unlock() } func (s *SeqMap) Size() int { @@ -17,44 +26,51 @@ func (s *SeqMap) Size() int { return size } +// Get returns the UID of the given seqnum func (s *SeqMap) Get(seqnum uint32) (uint32, bool) { - s.lock.Lock() - uid, found := s.m[seqnum] - s.lock.Unlock() - return uid, found -} - -func (s *SeqMap) Put(seqnum, uid uint32) { - s.lock.Lock() - if s.m == nil { - s.m = make(map[uint32]uint32) + if int(seqnum) > s.Size() || seqnum < 1 { + return 0, false } - s.m[seqnum] = uid + s.lock.Lock() + uid := s.m[seqnum-1] s.lock.Unlock() + return uid, true } -func (s *SeqMap) Pop(seqnum uint32) (uint32, bool) { +// Put adds a UID to the slice. Put should only be used to add new messages +// into the slice +func (s *SeqMap) Put(uid uint32) { s.lock.Lock() - uid, found := s.m[seqnum] - if found { - m := make(map[uint32]uint32) - for s, u := range s.m { - if s > seqnum { - // All sequence numbers greater than the removed one must be decremented by one - // https://datatracker.ietf.org/doc/html/rfc3501#section-7.4.1 - m[s-1] = u - } else if s < seqnum { - m[s] = u - } + for _, n := range s.m { + if n == uid { + // We already have this UID, don't insert it. + s.lock.Unlock() + return } - s.m = m } + s.m = append(s.m, uid) + s.sort() s.lock.Unlock() - return uid, found } -func (s *SeqMap) Clear() { +// Pop removes seqnum from the SeqMap. seqnum must be a valid seqnum, ie +// [1:size of mailbox] +func (s *SeqMap) Pop(seqnum uint32) (uint32, bool) { + if int(seqnum) > s.Size() || seqnum < 1 { + return 0, false + } s.lock.Lock() - s.m = make(map[uint32]uint32) + uid := s.m[seqnum-1] + s.m = append(s.m[:seqnum-1], s.m[seqnum:]...) s.lock.Unlock() + return uid, true +} + +// sort sorts the slice in ascending UID order. See: +// https://datatracker.ietf.org/doc/html/rfc3501#section-2.3.1.2 +func (s *SeqMap) sort() { + // Always be sure the SeqMap is sorted + sort.Slice(s.m, func(i, j int) bool { + return s.m[i] < s.m[j] + }) } diff --git a/worker/imap/seqmap_test.go b/worker/imap/seqmap_test.go index 7067721..42c06f8 100644 --- a/worker/imap/seqmap_test.go +++ b/worker/imap/seqmap_test.go @@ -22,56 +22,57 @@ func TestSeqMap(t *testing.T) { _, found = seqmap.Pop(0) assert.Equal(false, found) - seqmap.Put(1, 1337) - seqmap.Put(2, 42) - seqmap.Put(3, 1107) + seqmap.Initialize([]uint32{1337, 42, 1107}) assert.Equal(3, seqmap.Size()) _, found = seqmap.Pop(0) assert.Equal(false, found) uid, found = seqmap.Get(1) - assert.Equal(1337, int(uid)) + assert.Equal(42, int(uid)) assert.Equal(true, found) uid, found = seqmap.Pop(1) - assert.Equal(1337, int(uid)) + assert.Equal(42, int(uid)) assert.Equal(true, found) assert.Equal(2, seqmap.Size()) - // Repop the same seqnum should work because of the syncing + uid, found = seqmap.Get(1) + assert.Equal(1107, int(uid)) + + // Repeated puts of the same UID shouldn't change the size + seqmap.Put(1231) + assert.Equal(3, seqmap.Size()) + seqmap.Put(1231) + assert.Equal(3, seqmap.Size()) + + uid, found = seqmap.Get(2) + assert.Equal(1231, int(uid)) + _, found = seqmap.Pop(1) assert.Equal(true, found) - assert.Equal(1, seqmap.Size()) + assert.Equal(2, seqmap.Size()) - // sync means we already have a 1. This is replacing that UID so the size - // shouldn't increase - seqmap.Put(1, 7331) - assert.Equal(1, seqmap.Size()) - - seqmap.Clear() + seqmap.Initialize(nil) assert.Equal(0, seqmap.Size()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - time.Sleep(20 * time.Millisecond) - seqmap.Put(42, 1337) - time.Sleep(20 * time.Millisecond) - seqmap.Put(43, 1107) + seqmap.Initialize([]uint32{42, 1337}) }() wg.Add(1) go func() { defer wg.Done() - for _, found := seqmap.Pop(43); !found; _, found = seqmap.Pop(43) { + for _, found := seqmap.Pop(1); !found; _, found = seqmap.Pop(1) { time.Sleep(1 * time.Millisecond) } }() wg.Add(1) go func() { defer wg.Done() - for _, found := seqmap.Pop(42); !found; _, found = seqmap.Pop(42) { + for _, found := seqmap.Pop(1); !found; _, found = seqmap.Pop(1) { time.Sleep(1 * time.Millisecond) } }() diff --git a/worker/imap/worker.go b/worker/imap/worker.go index dee089e..40debe6 100644 --- a/worker/imap/worker.go +++ b/worker/imap/worker.go @@ -253,6 +253,9 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) { msg.Uid = uid } } + if int(msg.SeqNum) > w.seqMap.Size() { + w.seqMap.Put(msg.Uid) + } w.worker.PostMessage(&types.MessageInfo{ Info: &models.MessageInfo{ BodyStructure: translateBodyStructure(msg.BodyStructure),