worker: use container/list as job queue

The worker uses a buffered channel to queue tasks. Buffered channels
are effective at FIFO, but are prone to blocking. The design of aerc is
such that the UI must always accept a response from the backends, and
the backends must always accept a request from the UI. By using buffered
channels for both of these communication channels, a deadlock will
occur.

Break the chain by using a doubly linked list (container/list from the
standard library) to queue tasks for the worker. Essentially, this is an
infinitely buffered channel - but more memory efficient as it can change
size dynamically.

Signed-off-by: Tim Culverhouse <tim@timculverhouse.com>
Signed-off-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
Tim Culverhouse 2022-09-27 22:43:53 -05:00 committed by Robin Jarry
parent 1c2dd4c9f1
commit 4c371170c5

View file

@ -1,6 +1,7 @@
package types package types
import ( import (
"container/list"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -21,16 +22,19 @@ type Worker struct {
actionCallbacks map[int64]func(msg WorkerMessage) actionCallbacks map[int64]func(msg WorkerMessage)
messageCallbacks map[int64]func(msg WorkerMessage) messageCallbacks map[int64]func(msg WorkerMessage)
actionQueue *list.List
status int32
sync.Mutex sync.Mutex
} }
func NewWorker() *Worker { func NewWorker() *Worker {
return &Worker{ return &Worker{
Actions: make(chan WorkerMessage, 50), Actions: make(chan WorkerMessage),
Messages: make(chan WorkerMessage, 50), Messages: make(chan WorkerMessage, 50),
actionCallbacks: make(map[int64]func(msg WorkerMessage)), actionCallbacks: make(map[int64]func(msg WorkerMessage)),
messageCallbacks: make(map[int64]func(msg WorkerMessage)), messageCallbacks: make(map[int64]func(msg WorkerMessage)),
actionQueue: list.New(),
} }
} }
@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) {
msg.setId(id) msg.setId(id)
} }
const (
idle int32 = iota
busy
)
// Add a new task to the action queue without blocking. Start processing the
// queue in the background if needed.
func (worker *Worker) queue(msg WorkerMessage) {
worker.Lock()
defer worker.Unlock()
worker.actionQueue.PushBack(msg)
if atomic.LoadInt32(&worker.status) == idle {
atomic.StoreInt32(&worker.status, busy)
go worker.processQueue()
}
}
// Start processing the action queue and write all messages to the Actions
// channel, one by one. Stop when the action queue is empty.
func (worker *Worker) processQueue() {
for {
worker.Lock()
e := worker.actionQueue.Front()
if e == nil {
atomic.StoreInt32(&worker.status, idle)
worker.Unlock()
return
}
msg := worker.actionQueue.Remove(e).(WorkerMessage)
worker.Unlock()
worker.Actions <- msg
}
}
// PostAction posts an action to the worker. This method should not be called // PostAction posts an action to the worker. This method should not be called
// from the same goroutine that the worker runs in or deadlocks may occur // from the same goroutine that the worker runs in or deadlocks may occur
func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) { func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) {
@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage))
} else { } else {
logging.Debugf("PostAction %T", msg) logging.Debugf("PostAction %T", msg)
} }
worker.Actions <- msg // write to Actions channel without blocking
worker.queue(msg)
if cb != nil { if cb != nil {
worker.Lock() worker.Lock()