diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index c25f4143..ab4a076b 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -14,6 +14,7 @@ from platypush.bus import Bus from platypush.config import Config from platypush.context import get_backend, get_plugin from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout +from platypush.event import EventGenerator from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.request import Request @@ -21,7 +22,7 @@ from platypush.message.response import Response from platypush.utils import get_redis_queue_name_by_message, set_thread_name -class Backend(Thread): +class Backend(Thread, EventGenerator): """ Parent class for backends. @@ -45,7 +46,8 @@ class Backend(Thread): """ self._thread_name = self.__class__.__name__ - super().__init__(name=self._thread_name) + EventGenerator.__init__(self) + Thread.__init__(self, name=self._thread_name) # If no bus is specified, create an internal queue where # the received messages will be pushed @@ -266,4 +268,3 @@ class Backend(Thread): # vim:sw=4:ts=4:et: - diff --git a/platypush/event/__init__.py b/platypush/event/__init__.py index e69de29b..6fac9b1b 100644 --- a/platypush/event/__init__.py +++ b/platypush/event/__init__.py @@ -0,0 +1,77 @@ +import inspect +import threading + + +class EventGenerator(object): + def __init__(self, *args, **kwargs): + self._event_handlers = {} # Event type => callback map + + def fire_event(self, event): + """ + Fires an event (instance of :class:`platypush.message.event.Event` or a + subclass) to the internal bus and triggers any handler callback + associated to the event type or any of its super-classes. + + :param event: Event to fire + :type event: :class:`platypush.message.event.Event` or a subclass + """ + + def hndl_thread(): + hndl(event) + + from platypush.backend import Backend + from platypush.context import get_bus + + bus = self.bus if isinstance(self, Backend) else get_bus() + bus.post(event) + handlers = set() + + for cls in inspect.getmro(event.__class__): + if cls in self._event_handlers: + handlers.update(self._event_handlers[cls]) + + for hndl in handlers: + threading.Thread(target=hndl_thread).start() + + + def register_handler(self, event_type, callback): + """ + Registers a callback handler for a camera event type. + + :param event_type: Event type to listen to. Must be a subclass of + :class:`platypush.message.event.Event` + :type event_type: :class:`platypush.message.event.Event` or a subclass + + :param callback: Callback function. It will take an event of type + :class:`platypush.message.event.Event` as a parameter + :type callback: function + """ + + if event_type not in self._event_handlers: + self._event_handlers[event_type] = set() + self._event_handlers[event_type].add(callback) + + + def unregister_handler(self, event_type, callback): + """ + Unregisters a callback handler from a camera event type. + + :param event_type: Event type the callback is registered to + :type event_type: :class:`platypush.message.event.Event` or a subclass + + :param callback: Callback function to unregister + :type callback: function + """ + + if event_type not in self._event_handlers: + return + if callback not in self._event_handlers[event_type]: + return + + self._event_handlers[event_type].remove(callback) + + if not self._event_handlers[event_type]: + del self._event_handlers[event_type] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index c6addb3f..5b777dc9 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -1,10 +1,13 @@ +import inspect import sys import logging +import threading import traceback from functools import wraps from platypush.config import Config +from platypush.event import EventGenerator from platypush.message.response import Response from platypush.utils import get_decorators @@ -33,10 +36,12 @@ def action(f): return _execute_action -class Plugin(object): +class Plugin(EventGenerator): """ Base plugin class """ def __init__(self, **kwargs): + super().__init__() + self.logger = logging.getLogger(self.__class__.__name__) if 'logging' in kwargs: self.logger.setLevel(getattr(logging, kwargs['logging'].upper())) @@ -46,6 +51,7 @@ class Plugin(object): .get('action', []) ) + def run(self, method, *args, **kwargs): if method not in self.registered_actions: raise RuntimeError('{} is not a registered action on {}'.format( @@ -55,4 +61,3 @@ class Plugin(object): # vim:sw=4:ts=4:et: - diff --git a/platypush/plugins/camera/__init__.py b/platypush/plugins/camera/__init__.py index ba58af3d..715561b9 100644 --- a/platypush/plugins/camera/__init__.py +++ b/platypush/plugins/camera/__init__.py @@ -9,10 +9,9 @@ import cv2 from datetime import datetime from platypush.config import Config -from platypush.context import get_bus from platypush.message.event.camera import CameraRecordingStartedEvent, \ CameraRecordingStoppedEvent, CameraVideoRenderedEvent, \ - CameraPictureTakenEvent + CameraPictureTakenEvent, CameraEvent from platypush.plugins import Plugin, action @@ -123,7 +122,7 @@ class CameraPlugin(Plugin): if device_id in self._devices: self._devices[device_id].release() del self._devices[device_id] - get_bus().post(CameraRecordingStoppedEvent(device_id=device_id)) + self.fire_event(CameraRecordingStoppedEvent(device_id=device_id)) def _store_frame_to_file(self, frame, frames_dir, image_file): @@ -135,9 +134,6 @@ class CameraPlugin(Plugin): frames_dir, datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f.jpg')) cv2.imwrite(filepath, frame) - - if image_file: - get_bus().post(CameraPictureTakenEvent(filename=image_file)) return filepath @@ -189,7 +185,8 @@ class CameraPlugin(Plugin): for f in files: video.write(cv2.imread(f)) video.release() - get_bus().post(CameraVideoRenderedEvent(filename=video_file)) + + self.fire_event(CameraVideoRenderedEvent(filename=video_file)) shutil.rmtree(frames_dir, ignore_errors=True) @@ -215,7 +212,7 @@ class CameraPlugin(Plugin): if frames_dir: evt_args['frames_dir'] = frames_dir - get_bus().post(CameraRecordingStartedEvent(**evt_args)) + self.fire_event(CameraRecordingStartedEvent(**evt_args)) while self._is_recording[device_id].is_set(): if duration and time.time() - recording_started_time >= duration \ @@ -241,6 +238,10 @@ class CameraPlugin(Plugin): time.sleep(sleep_between_frames) self._release_device(device_id, wait_thread_termination=False) + + if image_file: + self.fire_event(CameraPictureTakenEvent(filename=image_file)) + self.logger.info('Recording terminated') if video_file: @@ -273,6 +274,10 @@ class CameraPlugin(Plugin): these parameters if you want to override the default configured ones. """ + recording_started = threading.Event() + def on_recording_started(event): + recording_started.set() + device_id = device_id if device_id is not None else self.default_device_id frames_dir = os.path.abspath(os.path.expanduser(frames_dir)) \ if frames_dir is not None else self.frames_dir @@ -284,6 +289,7 @@ class CameraPlugin(Plugin): is not None else self.color_transform self._init_device(device_id) + self.register_handler(CameraRecordingStartedEvent, on_recording_started) frames_dir = os.path.join(frames_dir, str(device_id)) if video_file: @@ -302,6 +308,9 @@ class CameraPlugin(Plugin): self._recording_threads[device_id].start() self._is_recording[device_id].set() + + recording_started.wait() + self.unregister_handler(CameraRecordingStartedEvent, on_recording_started) return { 'path': video_file if video_file else frames_dir } @action @@ -325,6 +334,10 @@ class CameraPlugin(Plugin): :param device_id, warmup_frames, color_transform: Overrides the configured default parameters """ + picture_taken = threading.Event() + def on_picture_taken(event): + picture_taken.set() + image_file = os.path.abspath(os.path.expanduser(image_file)) device_id = device_id if device_id is not None else self.default_device_id warmup_frames = warmup_frames if warmup_frames is not None else \ @@ -333,6 +346,7 @@ class CameraPlugin(Plugin): is not None else self.color_transform self._init_device(device_id) + self.register_handler(CameraPictureTakenEvent, on_picture_taken) self._recording_threads[device_id] = threading.Thread( target=self._recording_thread(duration=None, video_file=None, image_file=image_file, @@ -344,6 +358,9 @@ class CameraPlugin(Plugin): self._recording_threads[device_id].start() self._is_recording[device_id].set() + + picture_taken.wait() + self.unregister_handler(CameraPictureTakenEvent, on_picture_taken) return { 'path': image_file }