New API for firing events and registering/unregistering event handlers

both for plugins and backends
This commit is contained in:
Fabio Manganiello 2019-02-28 01:21:25 +01:00
parent e445029007
commit a9fb6a38dd
4 changed files with 113 additions and 13 deletions

View file

@ -14,6 +14,7 @@ from platypush.bus import Bus
from platypush.config import Config
from platypush.context import get_backend, get_plugin
from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event, StopEvent
from platypush.message.request import Request
@ -21,7 +22,7 @@ from platypush.message.response import Response
from platypush.utils import get_redis_queue_name_by_message, set_thread_name
class Backend(Thread):
class Backend(Thread, EventGenerator):
"""
Parent class for backends.
@ -45,7 +46,8 @@ class Backend(Thread):
"""
self._thread_name = self.__class__.__name__
super().__init__(name=self._thread_name)
EventGenerator.__init__(self)
Thread.__init__(self, name=self._thread_name)
# If no bus is specified, create an internal queue where
# the received messages will be pushed
@ -266,4 +268,3 @@ class Backend(Thread):
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,77 @@
import inspect
import threading
class EventGenerator(object):
def __init__(self, *args, **kwargs):
self._event_handlers = {} # Event type => callback map
def fire_event(self, event):
"""
Fires an event (instance of :class:`platypush.message.event.Event` or a
subclass) to the internal bus and triggers any handler callback
associated to the event type or any of its super-classes.
:param event: Event to fire
:type event: :class:`platypush.message.event.Event` or a subclass
"""
def hndl_thread():
hndl(event)
from platypush.backend import Backend
from platypush.context import get_bus
bus = self.bus if isinstance(self, Backend) else get_bus()
bus.post(event)
handlers = set()
for cls in inspect.getmro(event.__class__):
if cls in self._event_handlers:
handlers.update(self._event_handlers[cls])
for hndl in handlers:
threading.Thread(target=hndl_thread).start()
def register_handler(self, event_type, callback):
"""
Registers a callback handler for a camera event type.
:param event_type: Event type to listen to. Must be a subclass of
:class:`platypush.message.event.Event`
:type event_type: :class:`platypush.message.event.Event` or a subclass
:param callback: Callback function. It will take an event of type
:class:`platypush.message.event.Event` as a parameter
:type callback: function
"""
if event_type not in self._event_handlers:
self._event_handlers[event_type] = set()
self._event_handlers[event_type].add(callback)
def unregister_handler(self, event_type, callback):
"""
Unregisters a callback handler from a camera event type.
:param event_type: Event type the callback is registered to
:type event_type: :class:`platypush.message.event.Event` or a subclass
:param callback: Callback function to unregister
:type callback: function
"""
if event_type not in self._event_handlers:
return
if callback not in self._event_handlers[event_type]:
return
self._event_handlers[event_type].remove(callback)
if not self._event_handlers[event_type]:
del self._event_handlers[event_type]
# vim:sw=4:ts=4:et:

View file

@ -1,10 +1,13 @@
import inspect
import sys
import logging
import threading
import traceback
from functools import wraps
from platypush.config import Config
from platypush.event import EventGenerator
from platypush.message.response import Response
from platypush.utils import get_decorators
@ -33,10 +36,12 @@ def action(f):
return _execute_action
class Plugin(object):
class Plugin(EventGenerator):
""" Base plugin class """
def __init__(self, **kwargs):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
if 'logging' in kwargs:
self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
@ -46,6 +51,7 @@ class Plugin(object):
.get('action', [])
)
def run(self, method, *args, **kwargs):
if method not in self.registered_actions:
raise RuntimeError('{} is not a registered action on {}'.format(
@ -55,4 +61,3 @@ class Plugin(object):
# vim:sw=4:ts=4:et:

View file

@ -9,10 +9,9 @@ import cv2
from datetime import datetime
from platypush.config import Config
from platypush.context import get_bus
from platypush.message.event.camera import CameraRecordingStartedEvent, \
CameraRecordingStoppedEvent, CameraVideoRenderedEvent, \
CameraPictureTakenEvent
CameraPictureTakenEvent, CameraEvent
from platypush.plugins import Plugin, action
@ -123,7 +122,7 @@ class CameraPlugin(Plugin):
if device_id in self._devices:
self._devices[device_id].release()
del self._devices[device_id]
get_bus().post(CameraRecordingStoppedEvent(device_id=device_id))
self.fire_event(CameraRecordingStoppedEvent(device_id=device_id))
def _store_frame_to_file(self, frame, frames_dir, image_file):
@ -135,9 +134,6 @@ class CameraPlugin(Plugin):
frames_dir, datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f.jpg'))
cv2.imwrite(filepath, frame)
if image_file:
get_bus().post(CameraPictureTakenEvent(filename=image_file))
return filepath
@ -189,7 +185,8 @@ class CameraPlugin(Plugin):
for f in files:
video.write(cv2.imread(f))
video.release()
get_bus().post(CameraVideoRenderedEvent(filename=video_file))
self.fire_event(CameraVideoRenderedEvent(filename=video_file))
shutil.rmtree(frames_dir, ignore_errors=True)
@ -215,7 +212,7 @@ class CameraPlugin(Plugin):
if frames_dir:
evt_args['frames_dir'] = frames_dir
get_bus().post(CameraRecordingStartedEvent(**evt_args))
self.fire_event(CameraRecordingStartedEvent(**evt_args))
while self._is_recording[device_id].is_set():
if duration and time.time() - recording_started_time >= duration \
@ -241,6 +238,10 @@ class CameraPlugin(Plugin):
time.sleep(sleep_between_frames)
self._release_device(device_id, wait_thread_termination=False)
if image_file:
self.fire_event(CameraPictureTakenEvent(filename=image_file))
self.logger.info('Recording terminated')
if video_file:
@ -273,6 +274,10 @@ class CameraPlugin(Plugin):
these parameters if you want to override the default configured ones.
"""
recording_started = threading.Event()
def on_recording_started(event):
recording_started.set()
device_id = device_id if device_id is not None else self.default_device_id
frames_dir = os.path.abspath(os.path.expanduser(frames_dir)) \
if frames_dir is not None else self.frames_dir
@ -284,6 +289,7 @@ class CameraPlugin(Plugin):
is not None else self.color_transform
self._init_device(device_id)
self.register_handler(CameraRecordingStartedEvent, on_recording_started)
frames_dir = os.path.join(frames_dir, str(device_id))
if video_file:
@ -302,6 +308,9 @@ class CameraPlugin(Plugin):
self._recording_threads[device_id].start()
self._is_recording[device_id].set()
recording_started.wait()
self.unregister_handler(CameraRecordingStartedEvent, on_recording_started)
return { 'path': video_file if video_file else frames_dir }
@action
@ -325,6 +334,10 @@ class CameraPlugin(Plugin):
:param device_id, warmup_frames, color_transform: Overrides the configured default parameters
"""
picture_taken = threading.Event()
def on_picture_taken(event):
picture_taken.set()
image_file = os.path.abspath(os.path.expanduser(image_file))
device_id = device_id if device_id is not None else self.default_device_id
warmup_frames = warmup_frames if warmup_frames is not None else \
@ -333,6 +346,7 @@ class CameraPlugin(Plugin):
is not None else self.color_transform
self._init_device(device_id)
self.register_handler(CameraPictureTakenEvent, on_picture_taken)
self._recording_threads[device_id] = threading.Thread(
target=self._recording_thread(duration=None, video_file=None,
image_file=image_file,
@ -344,6 +358,9 @@ class CameraPlugin(Plugin):
self._recording_threads[device_id].start()
self._is_recording[device_id].set()
picture_taken.wait()
self.unregister_handler(CameraPictureTakenEvent, on_picture_taken)
return { 'path': image_file }