From 7fd5f5c4f898a508b5982d359e72f6a47a899a4b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 16 Aug 2019 17:49:15 +0200 Subject: [PATCH] Proper synchronization with client connections and stop_streaming condition in the streaming thread --- platypush/plugins/camera/pi.py | 66 +++++++++++++++++----------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/platypush/plugins/camera/pi.py b/platypush/plugins/camera/pi.py index e6915f70..5950795b 100644 --- a/platypush/plugins/camera/pi.py +++ b/platypush/plugins/camera/pi.py @@ -369,16 +369,13 @@ class CameraPiPlugin(CameraPlugin): self._recording_thread.join() @action - def start_streaming(self, listen_port=5000, duration=None, format='h264', **opts): + def start_streaming(self, listen_port=5000, format='h264', **opts): """ Start recording to a network stream :param listen_port: TCP listen port (default: 5000) :type listen_port: int - :param duration: Stream duration in seconds (default: None, record until stop_streaming is called) - :type duration: float - :param format: Video stream format (default: h264) :type format: str @@ -390,54 +387,57 @@ class CameraPiPlugin(CameraPlugin): return None, 'A streaming thread is already running' camera = self._get_camera(**opts) - server_socket = socket.socket() + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('0.0.0.0', listen_port)) - server_socket.listen(0) + server_socket.listen(1) def streaming_thread(): try: self.logger.info('Starting streaming on port {}'.format(listen_port)) + should_stop = False - while True: - self.logger.info('Listening for client connection') - socket = server_socket.accept()[0].makefile('wb') - self.logger.info('Accepted client connection') - start_time = time.time() + while not should_stop: + sock = None + stream = None try: - camera.start_recording(socket, format=format) + server_socket.settimeout(1) + sock = server_socket.accept()[0] + stream = sock.makefile('wb') + self.logger.info('Accepted client connection from {}'.format(sock.getpeername())) + except socket.timeout: + pass - while True: - wait_time = 1 - if duration: - wait_time = min(wait_time, duration) - if time.time() - start_time >= duration: - break + try: + if stream: + camera.start_recording(stream, format=format) - camera.wait_recording(wait_time) - self._streaming_stop_condition.acquire() - should_stop = self._streaming_stop_condition.wait(timeout=0.1) - self._streaming_stop_condition.release() - - if should_stop: - break - - break + while not should_stop: + camera.wait_recording(1) except ConnectionError: self.logger.info('Client closed connection') + finally: + if not should_stop: + self._streaming_stop_condition.acquire() + should_stop = self._streaming_stop_condition.wait(timeout=1) + self._streaming_stop_condition.release() try: - socket.close() + camera.stop_recording() except: pass - finally: + try: - camera.stop_recording() - except BrokenPipeError: + sock.close() + except: pass - except Exception as e: - self.logger.exception(e) finally: + try: + server_socket.close() + except: + pass + self._streaming_thread = None self.logger.info('Stopped camera stream')