2022-12-17 21:41:23 +01:00
|
|
|
from queue import Queue, Empty
|
|
|
|
from threading import Event
|
|
|
|
from time import time
|
|
|
|
from typing import List, Optional
|
|
|
|
|
|
|
|
from platypush.entities import Entity
|
|
|
|
|
|
|
|
|
|
|
|
class EntitiesQueue(Queue):
|
|
|
|
"""
|
|
|
|
Extends the ``Queue`` class to provide an abstraction that allows to
|
|
|
|
getting and putting multiple entities at once and synchronize with the
|
|
|
|
upstream caller.
|
|
|
|
"""
|
|
|
|
|
2023-01-13 23:28:58 +01:00
|
|
|
def __init__(self, stop_event: Optional[Event] = None, timeout: float = 1.0):
|
2022-12-17 21:41:23 +01:00
|
|
|
super().__init__()
|
|
|
|
self._timeout = timeout
|
|
|
|
self._should_stop = stop_event
|
|
|
|
|
|
|
|
@property
|
|
|
|
def should_stop(self) -> bool:
|
|
|
|
return self._should_stop.is_set() if self._should_stop else False
|
|
|
|
|
|
|
|
def get(self, block=True, timeout=None) -> List[Entity]:
|
|
|
|
"""
|
|
|
|
Returns a batch of entities read from the queue.
|
|
|
|
"""
|
|
|
|
timeout = timeout or self._timeout
|
|
|
|
entities = []
|
|
|
|
last_poll_time = time()
|
|
|
|
|
|
|
|
while not self.should_stop and (time() - last_poll_time < timeout):
|
|
|
|
try:
|
|
|
|
entity = super().get(block=block, timeout=0.5)
|
|
|
|
except Empty:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if entity:
|
|
|
|
entities.append(entity)
|
|
|
|
|
|
|
|
return entities
|
|
|
|
|
|
|
|
def put(self, *entities: Entity, block=True, timeout=None):
|
|
|
|
"""
|
2023-02-08 01:09:25 +01:00
|
|
|
This method is called by an entity manager to update and persist the
|
2022-12-17 21:41:23 +01:00
|
|
|
state of some entities.
|
|
|
|
"""
|
|
|
|
for entity in entities:
|
|
|
|
super().put(entity, block=block, timeout=timeout)
|