From 190ab15a76107c0b7c121a1464adcf90b8e564f6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 16 Aug 2019 12:24:42 +0200 Subject: [PATCH] Fixed camera.pi plugin --- platypush/backend/websocket.py | 3 +- platypush/plugins/camera/pi.py | 181 +++++++++++++++++++-------------- 2 files changed, 107 insertions(+), 77 deletions(-) diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py index 718d3f6a..28701a37 100644 --- a/platypush/backend/websocket.py +++ b/platypush/backend/websocket.py @@ -88,8 +88,9 @@ class WebsocketBackend(Backend): self.logger.warning('Error on websocket send_event: {}'.format(e)) loop = get_or_create_event_loop() + active_websockets = self.active_websockets.copy() - for websocket in self.active_websockets: + for websocket in active_websockets: try: loop.run_until_complete(send_event(websocket)) except websockets.exceptions.ConnectionClosed: diff --git a/platypush/plugins/camera/pi.py b/platypush/plugins/camera/pi.py index 160861e0..e6915f70 100644 --- a/platypush/plugins/camera/pi.py +++ b/platypush/plugins/camera/pi.py @@ -69,9 +69,9 @@ class CameraPiPlugin(CameraPlugin): import picamera self._camera = picamera.PiCamera() - for (attr, value) in self.camera_args: + for (attr, value) in self.camera_args.items(): setattr(self._camera, attr, value) - for (attr, value) in opts: + for (attr, value) in opts.items(): setattr(self._camera, attr, value) return self._camera @@ -171,7 +171,7 @@ class CameraPiPlugin(CameraPlugin): if resize: capture_opts['resize'] = tuple(resize) - images = [os.path.join(directory, name_format % str(i+1)) for i in range(0, n_images)] + images = [os.path.join(directory, name_format % (i+1)) for i in range(0, n_images)] camera.capture_sequence(images, **capture_opts) return {'image_files': images} @@ -214,25 +214,26 @@ class CameraPiPlugin(CameraPlugin): capture_opts['resize'] = tuple(resize) def capture_thread(): - self.logger.info('Starting time lapse recording to directory {}'.format(directory)) - i = 0 + try: + self.logger.info('Starting time lapse recording to directory {}'.format(directory)) + i = 0 - for filename in camera.capture_continuous(os.path.join(directory, 'image_{counter:04d}.jpg')): - i += 1 - self.logger.debug('Captured {}'.format(filename)) + for filename in camera.capture_continuous(os.path.join(directory, 'image_{counter:04d}.jpg')): + i += 1 + self.logger.info('Captured {}'.format(filename)) - if i >= n_images: - break + if n_images and i >= n_images: + break - self._time_lapse_stop_condition.acquire() - should_stop = self._time_lapse_stop_condition.wait(timeout=interval) - self._time_lapse_stop_condition.release() + self._time_lapse_stop_condition.acquire() + should_stop = self._time_lapse_stop_condition.wait(timeout=interval) + self._time_lapse_stop_condition.release() - if should_stop: - break - - self._time_lapse_thread = None - self.logger.info('Stopped time lapse recording') + if should_stop: + break + finally: + self._time_lapse_thread = None + self.logger.info('Stopped time lapse recording') self._time_lapse_thread = threading.Thread(target=capture_thread) self._time_lapse_thread.start() @@ -297,49 +298,55 @@ class CameraPiPlugin(CameraPlugin): video_file = os.path.abspath(os.path.expanduser(video_file)) def recording_thread(): - if not multifile: - self.logger.info('Starting recording to video file {}'.format(video_file)) - camera.start_recording(video_file) - self._recording_stop_condition.acquire() - self._recording_stop_condition.wait(timeout=duration) - self._recording_stop_condition.release() - self.logger.info('Video recorded to {}'.format(video_file)) - else: - self.logger.info('Starting recording video files to directory {}'.format(directory)) - - i = 1 - start_time = time.time() - end_time = None - if duration is not None: - end_time = time.time() + duration - - camera.start_recording(name_format % i) - self._recording_stop_condition.acquire() - self._recording_stop_condition.wait(timeout=split_duration) - self._recording_stop_condition.release() - self.logger.info('Video file {} saved'.format(name_format % i)) - - while True: - i += 1 - remaining_duration = None - - if end_time: - remaining_duration = end_time - time.time() - split_duration = min(split_duration, remaining_duration) - if remaining_duration <= 0: - break - - camera.split_recording(name_format % i) + try: + if not multifile: + self.logger.info('Starting recording to video file {}'.format(video_file)) + camera.start_recording(video_file, format='h264') self._recording_stop_condition.acquire() - should_stop = self._recording_stop_condition.wait(timeout=split_duration) + self._recording_stop_condition.wait(timeout=duration) + self._recording_stop_condition.release() + self.logger.info('Video recorded to {}'.format(video_file)) + else: + self.logger.info('Starting recording video files to directory {}'.format(directory)) + + i = 1 + start_time = time.time() + end_time = None + if duration is not None: + end_time = time.time() + duration + + camera.start_recording(name_format % i, format='h264') + self._recording_stop_condition.acquire() + self._recording_stop_condition.wait(timeout=split_duration) self._recording_stop_condition.release() self.logger.info('Video file {} saved'.format(name_format % i)) - if should_stop: - break + while True: + i += 1 + remaining_duration = None - self.camera.stop_recording() - self._recording_thread = None + if end_time: + remaining_duration = end_time - time.time() + split_duration = min(split_duration, remaining_duration) + if remaining_duration <= 0: + break + + camera.split_recording(name_format % i) + self._recording_stop_condition.acquire() + should_stop = self._recording_stop_condition.wait(timeout=split_duration) + self._recording_stop_condition.release() + self.logger.info('Video file {} saved'.format(name_format % i)) + + if should_stop: + break + finally: + try: + camera.stop_recording() + except Exception as e: + self.logger.exception(e) + + self._recording_thread = None + self.logger.info('Stopped camera recording') self._recording_thread = threading.Thread(target=recording_thread) self._recording_thread.start() @@ -362,7 +369,7 @@ class CameraPiPlugin(CameraPlugin): self._recording_thread.join() @action - def start_streaming(self, listen_port=5000, duration=None, format='h264' **opts): + def start_streaming(self, listen_port=5000, duration=None, format='h264', **opts): """ Start recording to a network stream @@ -388,29 +395,51 @@ class CameraPiPlugin(CameraPlugin): server_socket.listen(0) def streaming_thread(): - self.logger.info('Starting streaming on listen port {}'.format(listen_port)) + try: + self.logger.info('Starting streaming on port {}'.format(listen_port)) - while True: - socket = server_socket.accept()[0].makefile('wb') - self.logger.info('Accepted client connection from {}'.format(socket.getpeername())) - - try: - camera.start_recording(socket, format=format) - self._streaming_stop_condition.acquire() - self._streaming_stop_condition.wait(timeout=duration) - self._streaming_stop_condition.release() - break - except ConnectionError: - self.logger.info('Client closed connection') + while True: + self.logger.info('Listening for client connection') + socket = server_socket.accept()[0].makefile('wb') + self.logger.info('Accepted client connection') + start_time = time.time() try: - socket.close() - except: - pass + camera.start_recording(socket, format=format) - self.camera.stop_recording() - self._streaming_thread = None - self.logger.info('Stopped camera stream') + while True: + wait_time = 1 + if duration: + wait_time = min(wait_time, duration) + if time.time() - start_time >= duration: + break + + camera.wait_recording(wait_time) + self._streaming_stop_condition.acquire() + should_stop = self._streaming_stop_condition.wait(timeout=0.1) + self._streaming_stop_condition.release() + + if should_stop: + break + + break + except ConnectionError: + self.logger.info('Client closed connection') + + try: + socket.close() + except: + pass + finally: + try: + camera.stop_recording() + except BrokenPipeError: + pass + except Exception as e: + self.logger.exception(e) + finally: + self._streaming_thread = None + self.logger.info('Stopped camera stream') self._streaming_thread = threading.Thread(target=streaming_thread) self._streaming_thread.start()