worker/imap: use the IMAP connection from a single goroutine
Unfortunately, the IMAP protocol hasn't been designed to be used from multiple goroutines at the same time. For instance, if you fetch twice the same message from two different goroutines, it's not possible to tell whether the response is for one receiver or the other. For this reason, go-imap clients aren't safe to use from multiple goroutines. This commit changes the IMAP workers to be synchronous again (a command is executed only after the previous one has completed). To use IMAP from different threads, popular clients (e.g. Thunderbird) typically open multiple connections.
This commit is contained in:
parent
072bc26872
commit
089740758c
3 changed files with 78 additions and 87 deletions
|
@ -46,50 +46,46 @@ func (imapw *IMAPWorker) handleFetchMessages(
|
||||||
msg types.WorkerMessage, uids *imap.SeqSet, items []imap.FetchItem,
|
msg types.WorkerMessage, uids *imap.SeqSet, items []imap.FetchItem,
|
||||||
section *imap.BodySectionName) {
|
section *imap.BodySectionName) {
|
||||||
|
|
||||||
|
messages := make(chan *imap.Message)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
messages := make(chan *imap.Message)
|
for _msg := range messages {
|
||||||
done := make(chan error, 1)
|
imapw.seqMap[_msg.SeqNum-1] = _msg.Uid
|
||||||
go func() {
|
switch msg.(type) {
|
||||||
done <- imapw.client.UidFetch(uids, items, messages)
|
case *types.FetchMessageHeaders:
|
||||||
}()
|
imapw.worker.PostMessage(&types.MessageInfo{
|
||||||
go func() {
|
Message: types.RespondTo(msg),
|
||||||
for _msg := range messages {
|
BodyStructure: _msg.BodyStructure,
|
||||||
imapw.seqMap[_msg.SeqNum-1] = _msg.Uid
|
Envelope: _msg.Envelope,
|
||||||
switch msg.(type) {
|
Flags: _msg.Flags,
|
||||||
case *types.FetchMessageHeaders:
|
InternalDate: _msg.InternalDate,
|
||||||
imapw.worker.PostMessage(&types.MessageInfo{
|
Uid: _msg.Uid,
|
||||||
Message: types.RespondTo(msg),
|
}, nil)
|
||||||
BodyStructure: _msg.BodyStructure,
|
case *types.FetchFullMessages:
|
||||||
Envelope: _msg.Envelope,
|
reader := _msg.GetBody(section)
|
||||||
Flags: _msg.Flags,
|
imapw.worker.PostMessage(&types.FullMessage{
|
||||||
InternalDate: _msg.InternalDate,
|
Message: types.RespondTo(msg),
|
||||||
Uid: _msg.Uid,
|
Reader: reader,
|
||||||
}, nil)
|
Uid: _msg.Uid,
|
||||||
case *types.FetchFullMessages:
|
}, nil)
|
||||||
reader := _msg.GetBody(section)
|
case *types.FetchMessageBodyPart:
|
||||||
imapw.worker.PostMessage(&types.FullMessage{
|
reader := _msg.GetBody(section)
|
||||||
Message: types.RespondTo(msg),
|
imapw.worker.PostMessage(&types.MessageBodyPart{
|
||||||
Reader: reader,
|
Message: types.RespondTo(msg),
|
||||||
Uid: _msg.Uid,
|
Reader: reader,
|
||||||
}, nil)
|
Uid: _msg.Uid,
|
||||||
case *types.FetchMessageBodyPart:
|
|
||||||
reader := _msg.GetBody(section)
|
|
||||||
imapw.worker.PostMessage(&types.MessageBodyPart{
|
|
||||||
Message: types.RespondTo(msg),
|
|
||||||
Reader: reader,
|
|
||||||
Uid: _msg.Uid,
|
|
||||||
}, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := <-done; err != nil {
|
|
||||||
imapw.worker.PostMessage(&types.Error{
|
|
||||||
Message: types.RespondTo(msg),
|
|
||||||
Error: err,
|
|
||||||
}, nil)
|
}, nil)
|
||||||
} else {
|
|
||||||
imapw.worker.PostMessage(
|
|
||||||
&types.Done{types.RespondTo(msg)}, nil)
|
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if err := imapw.client.UidFetch(uids, items, messages); err != nil {
|
||||||
|
imapw.worker.PostMessage(&types.Error{
|
||||||
|
Message: types.RespondTo(msg),
|
||||||
|
Error: err,
|
||||||
|
}, nil)
|
||||||
|
} else {
|
||||||
|
imapw.worker.PostMessage(
|
||||||
|
&types.Done{types.RespondTo(msg)}, nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,11 +8,8 @@ import (
|
||||||
|
|
||||||
func (imapw *IMAPWorker) handleListDirectories(msg *types.ListDirectories) {
|
func (imapw *IMAPWorker) handleListDirectories(msg *types.ListDirectories) {
|
||||||
mailboxes := make(chan *imap.MailboxInfo)
|
mailboxes := make(chan *imap.MailboxInfo)
|
||||||
done := make(chan error, 1)
|
|
||||||
imapw.worker.Logger.Println("Listing mailboxes")
|
imapw.worker.Logger.Println("Listing mailboxes")
|
||||||
go func() {
|
|
||||||
done <- imapw.client.List("", "*", mailboxes)
|
|
||||||
}()
|
|
||||||
go func() {
|
go func() {
|
||||||
for mbox := range mailboxes {
|
for mbox := range mailboxes {
|
||||||
imapw.worker.PostMessage(&types.Directory{
|
imapw.worker.PostMessage(&types.Directory{
|
||||||
|
@ -21,14 +18,15 @@ func (imapw *IMAPWorker) handleListDirectories(msg *types.ListDirectories) {
|
||||||
Attributes: mbox.Attributes,
|
Attributes: mbox.Attributes,
|
||||||
}, nil)
|
}, nil)
|
||||||
}
|
}
|
||||||
if err := <-done; err != nil {
|
|
||||||
imapw.worker.PostMessage(&types.Error{
|
|
||||||
Message: types.RespondTo(msg),
|
|
||||||
Error: err,
|
|
||||||
}, nil)
|
|
||||||
} else {
|
|
||||||
imapw.worker.PostMessage(
|
|
||||||
&types.Done{types.RespondTo(msg)}, nil)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if err := imapw.client.List("", "*", mailboxes); err != nil {
|
||||||
|
imapw.worker.PostMessage(&types.Error{
|
||||||
|
Message: types.RespondTo(msg),
|
||||||
|
Error: err,
|
||||||
|
}, nil)
|
||||||
|
} else {
|
||||||
|
imapw.worker.PostMessage(
|
||||||
|
&types.Done{types.RespondTo(msg)}, nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,17 +8,16 @@ import (
|
||||||
|
|
||||||
func (imapw *IMAPWorker) handleOpenDirectory(msg *types.OpenDirectory) {
|
func (imapw *IMAPWorker) handleOpenDirectory(msg *types.OpenDirectory) {
|
||||||
imapw.worker.Logger.Printf("Opening %s", msg.Directory)
|
imapw.worker.Logger.Printf("Opening %s", msg.Directory)
|
||||||
go func() {
|
|
||||||
_, err := imapw.client.Select(msg.Directory, false)
|
_, err := imapw.client.Select(msg.Directory, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
imapw.worker.PostMessage(&types.Error{
|
imapw.worker.PostMessage(&types.Error{
|
||||||
Message: types.RespondTo(msg),
|
Message: types.RespondTo(msg),
|
||||||
Error: err,
|
Error: err,
|
||||||
}, nil)
|
}, nil)
|
||||||
} else {
|
} else {
|
||||||
imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
|
imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (imapw *IMAPWorker) handleFetchDirectoryContents(
|
func (imapw *IMAPWorker) handleFetchDirectoryContents(
|
||||||
|
@ -26,25 +25,23 @@ func (imapw *IMAPWorker) handleFetchDirectoryContents(
|
||||||
|
|
||||||
imapw.worker.Logger.Printf("Fetching UID list")
|
imapw.worker.Logger.Printf("Fetching UID list")
|
||||||
|
|
||||||
go func() {
|
seqSet := &imap.SeqSet{}
|
||||||
seqSet := &imap.SeqSet{}
|
seqSet.AddRange(1, imapw.selected.Messages)
|
||||||
seqSet.AddRange(1, imapw.selected.Messages)
|
uids, err := imapw.client.UidSearch(&imap.SearchCriteria{
|
||||||
uids, err := imapw.client.UidSearch(&imap.SearchCriteria{
|
SeqNum: seqSet,
|
||||||
SeqNum: seqSet,
|
})
|
||||||
})
|
if err != nil {
|
||||||
if err != nil {
|
imapw.worker.PostMessage(&types.Error{
|
||||||
imapw.worker.PostMessage(&types.Error{
|
Message: types.RespondTo(msg),
|
||||||
Message: types.RespondTo(msg),
|
Error: err,
|
||||||
Error: err,
|
}, nil)
|
||||||
}, nil)
|
} else {
|
||||||
} else {
|
imapw.worker.Logger.Printf("Found %d UIDs", len(uids))
|
||||||
imapw.worker.Logger.Printf("Found %d UIDs", len(uids))
|
imapw.seqMap = make([]uint32, len(uids))
|
||||||
imapw.seqMap = make([]uint32, len(uids))
|
imapw.worker.PostMessage(&types.DirectoryContents{
|
||||||
imapw.worker.PostMessage(&types.DirectoryContents{
|
Message: types.RespondTo(msg),
|
||||||
Message: types.RespondTo(msg),
|
Uids: uids,
|
||||||
Uids: uids,
|
}, nil)
|
||||||
}, nil)
|
imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
|
||||||
imapw.worker.PostMessage(&types.Done{types.RespondTo(msg)}, nil)
|
}
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue