From 4c371170c57d39399a924bd4d27d67226a9a7f39 Mon Sep 17 00:00:00 2001 From: Tim Culverhouse Date: Tue, 27 Sep 2022 22:43:53 -0500 Subject: [PATCH] worker: use container/list as job queue The worker uses a buffered channel to queue tasks. Buffered channels are effective at FIFO, but are prone to blocking. The design of aerc is such that the UI must always accept a response from the backends, and the backends must always accept a request from the UI. By using buffered channels for both of these communication channels, a deadlock will occur. Break the chain by using a doubly linked list (container/list from the standard library) to queue tasks for the worker. Essentially, this is an infinitely buffered channel - but more memory efficient as it can change size dynamically. Signed-off-by: Tim Culverhouse Signed-off-by: Robin Jarry --- worker/types/worker.go | 43 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/worker/types/worker.go b/worker/types/worker.go index ee2f9a3..b5f5149 100644 --- a/worker/types/worker.go +++ b/worker/types/worker.go @@ -1,6 +1,7 @@ package types import ( + "container/list" "sync" "sync/atomic" @@ -21,16 +22,19 @@ type Worker struct { actionCallbacks map[int64]func(msg WorkerMessage) messageCallbacks map[int64]func(msg WorkerMessage) + actionQueue *list.List + status int32 sync.Mutex } func NewWorker() *Worker { return &Worker{ - Actions: make(chan WorkerMessage, 50), + Actions: make(chan WorkerMessage), Messages: make(chan WorkerMessage, 50), actionCallbacks: make(map[int64]func(msg WorkerMessage)), messageCallbacks: make(map[int64]func(msg WorkerMessage)), + actionQueue: list.New(), } } @@ -39,6 +43,40 @@ func (worker *Worker) setId(msg WorkerMessage) { msg.setId(id) } +const ( + idle int32 = iota + busy +) + +// Add a new task to the action queue without blocking. Start processing the +// queue in the background if needed. +func (worker *Worker) queue(msg WorkerMessage) { + worker.Lock() + defer worker.Unlock() + worker.actionQueue.PushBack(msg) + if atomic.LoadInt32(&worker.status) == idle { + atomic.StoreInt32(&worker.status, busy) + go worker.processQueue() + } +} + +// Start processing the action queue and write all messages to the Actions +// channel, one by one. Stop when the action queue is empty. +func (worker *Worker) processQueue() { + for { + worker.Lock() + e := worker.actionQueue.Front() + if e == nil { + atomic.StoreInt32(&worker.status, idle) + worker.Unlock() + return + } + msg := worker.actionQueue.Remove(e).(WorkerMessage) + worker.Unlock() + worker.Actions <- msg + } +} + // PostAction posts an action to the worker. This method should not be called // from the same goroutine that the worker runs in or deadlocks may occur func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) { @@ -49,7 +87,8 @@ func (worker *Worker) PostAction(msg WorkerMessage, cb func(msg WorkerMessage)) } else { logging.Debugf("PostAction %T", msg) } - worker.Actions <- msg + // write to Actions channel without blocking + worker.queue(msg) if cb != nil { worker.Lock()