Proper synchronization with client connections and stop_streaming condition in the streaming thread

This commit is contained in:
Fabio Manganiello 2019-08-16 17:49:15 +02:00
parent 190ab15a76
commit 7fd5f5c4f8

View file

@ -369,16 +369,13 @@ class CameraPiPlugin(CameraPlugin):
self._recording_thread.join() self._recording_thread.join()
@action @action
def start_streaming(self, listen_port=5000, duration=None, format='h264', **opts): def start_streaming(self, listen_port=5000, format='h264', **opts):
""" """
Start recording to a network stream Start recording to a network stream
:param listen_port: TCP listen port (default: 5000) :param listen_port: TCP listen port (default: 5000)
:type listen_port: int :type listen_port: int
:param duration: Stream duration in seconds (default: None, record until stop_streaming is called)
:type duration: float
:param format: Video stream format (default: h264) :param format: Video stream format (default: h264)
:type format: str :type format: str
@ -390,54 +387,57 @@ class CameraPiPlugin(CameraPlugin):
return None, 'A streaming thread is already running' return None, 'A streaming thread is already running'
camera = self._get_camera(**opts) camera = self._get_camera(**opts)
server_socket = socket.socket() server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', listen_port)) server_socket.bind(('0.0.0.0', listen_port))
server_socket.listen(0) server_socket.listen(1)
def streaming_thread(): def streaming_thread():
try: try:
self.logger.info('Starting streaming on port {}'.format(listen_port)) self.logger.info('Starting streaming on port {}'.format(listen_port))
should_stop = False
while True: while not should_stop:
self.logger.info('Listening for client connection') sock = None
socket = server_socket.accept()[0].makefile('wb') stream = None
self.logger.info('Accepted client connection')
start_time = time.time()
try: try:
camera.start_recording(socket, format=format) server_socket.settimeout(1)
sock = server_socket.accept()[0]
stream = sock.makefile('wb')
self.logger.info('Accepted client connection from {}'.format(sock.getpeername()))
except socket.timeout:
pass
while True: try:
wait_time = 1 if stream:
if duration: camera.start_recording(stream, format=format)
wait_time = min(wait_time, duration)
if time.time() - start_time >= duration:
break
camera.wait_recording(wait_time) while not should_stop:
self._streaming_stop_condition.acquire() camera.wait_recording(1)
should_stop = self._streaming_stop_condition.wait(timeout=0.1)
self._streaming_stop_condition.release()
if should_stop:
break
break
except ConnectionError: except ConnectionError:
self.logger.info('Client closed connection') self.logger.info('Client closed connection')
finally:
if not should_stop:
self._streaming_stop_condition.acquire()
should_stop = self._streaming_stop_condition.wait(timeout=1)
self._streaming_stop_condition.release()
try: try:
socket.close() camera.stop_recording()
except:
pass
try:
sock.close()
except: except:
pass pass
finally: finally:
try: try:
camera.stop_recording() server_socket.close()
except BrokenPipeError: except:
pass pass
except Exception as e:
self.logger.exception(e)
finally:
self._streaming_thread = None self._streaming_thread = None
self.logger.info('Stopped camera stream') self.logger.info('Stopped camera stream')