seqmap: refactor seqmap to use slice instead of map
The imap worker's seqmap is represented as a map of sequence number to UID. This presents a problem when expunging group of messages from the mailbox: each individual expunge decrements the sequence numbers by 1 (for every sequence number greater than the expunged). This requires a looping around the map to update the keys. The use of a map also requires that both the sequence number and the UID of a message be known in order to insert it into the map. This is only discovered by fetching individual message body parts (flags, headers, etc), leaving the seqmap to be empty until we have fetched information about each message. In certain instances (if a mailbox has recently been loaded), all information is loaded in memory and no new information is fetched - leaving the seqmap empty and the UI out of sync with the worker. Refactor the seqmap as a slice, so that any expunge automatically decrements the rest of the sequences. Use the results of FetchDirectoryContents or FetchDirectoryThreaded to initialize the seqmap with all discovered UIDs. Sort the UIDs in ascending order: IMAP specification requires that sequence numbers start at 1 increase in order of ascending UID. Add individual messages to the map if they come via a MessageUpdate and have a sequence number larger than our slice. Update seqmap tests with new logic. Reference: https://datatracker.ietf.org/doc/html/rfc3501#section-2.3.1.2 Fixes: https://todo.sr.ht/~rjarry/aerc/69 Signed-off-by: Tim Culverhouse <tim@timculverhouse.com> Acked-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
parent
9630d9d281
commit
fdfec2c07a
5 changed files with 82 additions and 50 deletions
|
@ -216,7 +216,6 @@ func (imapw *IMAPWorker) handleFetchMessages(
|
|||
|
||||
var reterr error
|
||||
for _msg := range messages {
|
||||
imapw.seqMap.Put(_msg.SeqNum, _msg.Uid)
|
||||
err := procFunc(_msg)
|
||||
if err != nil {
|
||||
if reterr == nil {
|
||||
|
|
|
@ -65,7 +65,10 @@ func (imapw *IMAPWorker) handleFetchDirectoryContents(
|
|||
}, nil)
|
||||
} else {
|
||||
logging.Infof("Found %d UIDs", len(uids))
|
||||
imapw.seqMap.Clear()
|
||||
if len(msg.FilterCriteria) == 1 {
|
||||
// Only initialize if we are not filtering
|
||||
imapw.seqMap.Initialize(uids)
|
||||
}
|
||||
imapw.worker.PostMessage(&types.DirectoryContents{
|
||||
Message: types.RespondTo(msg),
|
||||
Uids: uids,
|
||||
|
@ -123,7 +126,17 @@ func (imapw *IMAPWorker) handleDirectoryThreaded(
|
|||
aercThreads, count := convertThreads(threads, nil)
|
||||
sort.Sort(types.ByUID(aercThreads))
|
||||
logging.Infof("Found %d threaded messages", count)
|
||||
imapw.seqMap.Clear()
|
||||
if len(msg.FilterCriteria) == 1 {
|
||||
// Only initialize if we are not filtering
|
||||
var uids []uint32
|
||||
for i := len(aercThreads) - 1; i >= 0; i-- {
|
||||
aercThreads[i].Walk(func(t *types.Thread, level int, currentErr error) error {
|
||||
uids = append(uids, t.Uid)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
imapw.seqMap.Initialize(uids)
|
||||
}
|
||||
imapw.worker.PostMessage(&types.DirectoryThreaded{
|
||||
Message: types.RespondTo(msg),
|
||||
Threads: aercThreads,
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
package imap
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type SeqMap struct {
|
||||
lock sync.Mutex
|
||||
// map of IMAP sequence numbers to message UIDs
|
||||
m map[uint32]uint32
|
||||
m []uint32
|
||||
}
|
||||
|
||||
// Initialize sets the initial seqmap of the mailbox
|
||||
func (s *SeqMap) Initialize(uids []uint32) {
|
||||
s.lock.Lock()
|
||||
s.m = uids
|
||||
s.sort()
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
func (s *SeqMap) Size() int {
|
||||
|
@ -17,44 +26,51 @@ func (s *SeqMap) Size() int {
|
|||
return size
|
||||
}
|
||||
|
||||
// Get returns the UID of the given seqnum
|
||||
func (s *SeqMap) Get(seqnum uint32) (uint32, bool) {
|
||||
s.lock.Lock()
|
||||
uid, found := s.m[seqnum]
|
||||
s.lock.Unlock()
|
||||
return uid, found
|
||||
}
|
||||
|
||||
func (s *SeqMap) Put(seqnum, uid uint32) {
|
||||
s.lock.Lock()
|
||||
if s.m == nil {
|
||||
s.m = make(map[uint32]uint32)
|
||||
if int(seqnum) > s.Size() || seqnum < 1 {
|
||||
return 0, false
|
||||
}
|
||||
s.m[seqnum] = uid
|
||||
s.lock.Lock()
|
||||
uid := s.m[seqnum-1]
|
||||
s.lock.Unlock()
|
||||
return uid, true
|
||||
}
|
||||
|
||||
func (s *SeqMap) Pop(seqnum uint32) (uint32, bool) {
|
||||
// Put adds a UID to the slice. Put should only be used to add new messages
|
||||
// into the slice
|
||||
func (s *SeqMap) Put(uid uint32) {
|
||||
s.lock.Lock()
|
||||
uid, found := s.m[seqnum]
|
||||
if found {
|
||||
m := make(map[uint32]uint32)
|
||||
for s, u := range s.m {
|
||||
if s > seqnum {
|
||||
// All sequence numbers greater than the removed one must be decremented by one
|
||||
// https://datatracker.ietf.org/doc/html/rfc3501#section-7.4.1
|
||||
m[s-1] = u
|
||||
} else if s < seqnum {
|
||||
m[s] = u
|
||||
}
|
||||
for _, n := range s.m {
|
||||
if n == uid {
|
||||
// We already have this UID, don't insert it.
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
s.m = m
|
||||
}
|
||||
s.m = append(s.m, uid)
|
||||
s.sort()
|
||||
s.lock.Unlock()
|
||||
return uid, found
|
||||
}
|
||||
|
||||
func (s *SeqMap) Clear() {
|
||||
// Pop removes seqnum from the SeqMap. seqnum must be a valid seqnum, ie
|
||||
// [1:size of mailbox]
|
||||
func (s *SeqMap) Pop(seqnum uint32) (uint32, bool) {
|
||||
if int(seqnum) > s.Size() || seqnum < 1 {
|
||||
return 0, false
|
||||
}
|
||||
s.lock.Lock()
|
||||
s.m = make(map[uint32]uint32)
|
||||
uid := s.m[seqnum-1]
|
||||
s.m = append(s.m[:seqnum-1], s.m[seqnum:]...)
|
||||
s.lock.Unlock()
|
||||
return uid, true
|
||||
}
|
||||
|
||||
// sort sorts the slice in ascending UID order. See:
|
||||
// https://datatracker.ietf.org/doc/html/rfc3501#section-2.3.1.2
|
||||
func (s *SeqMap) sort() {
|
||||
// Always be sure the SeqMap is sorted
|
||||
sort.Slice(s.m, func(i, j int) bool {
|
||||
return s.m[i] < s.m[j]
|
||||
})
|
||||
}
|
||||
|
|
|
@ -22,56 +22,57 @@ func TestSeqMap(t *testing.T) {
|
|||
_, found = seqmap.Pop(0)
|
||||
assert.Equal(false, found)
|
||||
|
||||
seqmap.Put(1, 1337)
|
||||
seqmap.Put(2, 42)
|
||||
seqmap.Put(3, 1107)
|
||||
seqmap.Initialize([]uint32{1337, 42, 1107})
|
||||
assert.Equal(3, seqmap.Size())
|
||||
|
||||
_, found = seqmap.Pop(0)
|
||||
assert.Equal(false, found)
|
||||
|
||||
uid, found = seqmap.Get(1)
|
||||
assert.Equal(1337, int(uid))
|
||||
assert.Equal(42, int(uid))
|
||||
assert.Equal(true, found)
|
||||
|
||||
uid, found = seqmap.Pop(1)
|
||||
assert.Equal(1337, int(uid))
|
||||
assert.Equal(42, int(uid))
|
||||
assert.Equal(true, found)
|
||||
assert.Equal(2, seqmap.Size())
|
||||
|
||||
// Repop the same seqnum should work because of the syncing
|
||||
uid, found = seqmap.Get(1)
|
||||
assert.Equal(1107, int(uid))
|
||||
|
||||
// Repeated puts of the same UID shouldn't change the size
|
||||
seqmap.Put(1231)
|
||||
assert.Equal(3, seqmap.Size())
|
||||
seqmap.Put(1231)
|
||||
assert.Equal(3, seqmap.Size())
|
||||
|
||||
uid, found = seqmap.Get(2)
|
||||
assert.Equal(1231, int(uid))
|
||||
|
||||
_, found = seqmap.Pop(1)
|
||||
assert.Equal(true, found)
|
||||
assert.Equal(1, seqmap.Size())
|
||||
assert.Equal(2, seqmap.Size())
|
||||
|
||||
// sync means we already have a 1. This is replacing that UID so the size
|
||||
// shouldn't increase
|
||||
seqmap.Put(1, 7331)
|
||||
assert.Equal(1, seqmap.Size())
|
||||
|
||||
seqmap.Clear()
|
||||
seqmap.Initialize(nil)
|
||||
assert.Equal(0, seqmap.Size())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
seqmap.Put(42, 1337)
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
seqmap.Put(43, 1107)
|
||||
seqmap.Initialize([]uint32{42, 1337})
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for _, found := seqmap.Pop(43); !found; _, found = seqmap.Pop(43) {
|
||||
for _, found := seqmap.Pop(1); !found; _, found = seqmap.Pop(1) {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for _, found := seqmap.Pop(42); !found; _, found = seqmap.Pop(42) {
|
||||
for _, found := seqmap.Pop(1); !found; _, found = seqmap.Pop(1) {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -253,6 +253,9 @@ func (w *IMAPWorker) handleImapUpdate(update client.Update) {
|
|||
msg.Uid = uid
|
||||
}
|
||||
}
|
||||
if int(msg.SeqNum) > w.seqMap.Size() {
|
||||
w.seqMap.Put(msg.Uid)
|
||||
}
|
||||
w.worker.PostMessage(&types.MessageInfo{
|
||||
Info: &models.MessageInfo{
|
||||
BodyStructure: translateBodyStructure(msg.BodyStructure),
|
||||
|
|
Loading…
Reference in a new issue