From 8f2eb1c4e0d76d165514600efafbb9bcf2e42fa8 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 29 Dec 2019 23:18:58 +0100 Subject: [PATCH] Reimplemented workers as threads instead of processes --- platypush/backend/ping.py | 2 +- platypush/utils/workers.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/platypush/backend/ping.py b/platypush/backend/ping.py index 33150072..f20001c0 100644 --- a/platypush/backend/ping.py +++ b/platypush/backend/ping.py @@ -46,7 +46,7 @@ class PingBackend(Backend): self.logger.info('Starting ping backend with {} hosts to monitor'.format(len(self.hosts))) while not self.should_stop(): - workers = Workers(min(len(self.hosts), 10), self.Pinger, timeout=self.timeout) + workers = Workers(10, self.Pinger, timeout=self.timeout) with workers: for host in self.hosts.keys(): diff --git a/platypush/utils/workers.py b/platypush/utils/workers.py index 6e0963ce..7d0ec0e1 100644 --- a/platypush/utils/workers.py +++ b/platypush/utils/workers.py @@ -1,6 +1,7 @@ -import multiprocessing +import threading from abc import ABC, abstractmethod +from queue import Queue from typing import Type @@ -8,13 +9,13 @@ class EndOfStream: pass -class Worker(ABC, multiprocessing.Process): +class Worker(ABC, threading.Thread): """ - Generic class for worker processes, used to split the execution of an action over multiple + Generic class for worker threads, used to split the execution of an action over multiple parallel instances. """ - def __init__(self, request_queue: multiprocessing.Queue, response_queue=multiprocessing.Queue): + def __init__(self, request_queue: Queue, response_queue=Queue, id=None): """ :param request_queue: The worker will listen for messages to process over this queue :param response_queue: The worker will return responses over this queue @@ -22,6 +23,7 @@ class Worker(ABC, multiprocessing.Process): super().__init__() self.request_queue = request_queue self.response_queue = response_queue + self._id = id def run(self) -> None: """ @@ -87,11 +89,11 @@ class Workers: :param args: Extra args to pass to the `worker_type` constructor :param kwargs: Extra kwargs to pass to the `worker_type` constructor """ - self.request_queue = multiprocessing.Queue() - self.response_queue = multiprocessing.Queue() + self.request_queue = Queue() + self.response_queue = Queue() # noinspection PyArgumentList - self._workers = [worker_type(self.request_queue, self.response_queue, *args, **kwargs) - for _ in range(n_workers)] + self._workers = [worker_type(self.request_queue, self.response_queue, id=i, *args, **kwargs) + for i in range(n_workers)] self.responses = [] def start(self):