Reimplemented workers as threads instead of processes

This commit is contained in:
Fabio Manganiello 2019-12-29 23:18:58 +01:00
parent 931ab9d673
commit 8f2eb1c4e0
2 changed files with 11 additions and 9 deletions

View file

@ -46,7 +46,7 @@ class PingBackend(Backend):
self.logger.info('Starting ping backend with {} hosts to monitor'.format(len(self.hosts))) self.logger.info('Starting ping backend with {} hosts to monitor'.format(len(self.hosts)))
while not self.should_stop(): while not self.should_stop():
workers = Workers(min(len(self.hosts), 10), self.Pinger, timeout=self.timeout) workers = Workers(10, self.Pinger, timeout=self.timeout)
with workers: with workers:
for host in self.hosts.keys(): for host in self.hosts.keys():

View file

@ -1,6 +1,7 @@
import multiprocessing import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from queue import Queue
from typing import Type from typing import Type
@ -8,13 +9,13 @@ class EndOfStream:
pass pass
class Worker(ABC, multiprocessing.Process): class Worker(ABC, threading.Thread):
""" """
Generic class for worker processes, used to split the execution of an action over multiple Generic class for worker threads, used to split the execution of an action over multiple
parallel instances. parallel instances.
""" """
def __init__(self, request_queue: multiprocessing.Queue, response_queue=multiprocessing.Queue): def __init__(self, request_queue: Queue, response_queue=Queue, id=None):
""" """
:param request_queue: The worker will listen for messages to process over this queue :param request_queue: The worker will listen for messages to process over this queue
:param response_queue: The worker will return responses over this queue :param response_queue: The worker will return responses over this queue
@ -22,6 +23,7 @@ class Worker(ABC, multiprocessing.Process):
super().__init__() super().__init__()
self.request_queue = request_queue self.request_queue = request_queue
self.response_queue = response_queue self.response_queue = response_queue
self._id = id
def run(self) -> None: def run(self) -> None:
""" """
@ -87,11 +89,11 @@ class Workers:
:param args: Extra args to pass to the `worker_type` constructor :param args: Extra args to pass to the `worker_type` constructor
:param kwargs: Extra kwargs to pass to the `worker_type` constructor :param kwargs: Extra kwargs to pass to the `worker_type` constructor
""" """
self.request_queue = multiprocessing.Queue() self.request_queue = Queue()
self.response_queue = multiprocessing.Queue() self.response_queue = Queue()
# noinspection PyArgumentList # noinspection PyArgumentList
self._workers = [worker_type(self.request_queue, self.response_queue, *args, **kwargs) self._workers = [worker_type(self.request_queue, self.response_queue, id=i, *args, **kwargs)
for _ in range(n_workers)] for i in range(n_workers)]
self.responses = [] self.responses = []
def start(self): def start(self):