From a103ea49f1958aef018d9e1394b87f9911723d03 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 27 Jun 2023 15:12:15 +0200 Subject: [PATCH] Fixed ffmpeg/audio consumer synchronization upon timeout. --- platypush/plugins/sound/_converters/_base.py | 13 +++++++++---- platypush/plugins/sound/_streams/_base.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/platypush/plugins/sound/_converters/_base.py b/platypush/plugins/sound/_converters/_base.py index efa19fcf4..3c81b9084 100644 --- a/platypush/plugins/sound/_converters/_base.py +++ b/platypush/plugins/sound/_converters/_base.py @@ -253,13 +253,18 @@ class AudioConverter(Thread, ABC): ), 'The stdout is closed for the ffmpeg process' self._ffmpeg_terminated.clear() + try: - data = await asyncio.wait_for( - self.ffmpeg.stdout.read(self._chunk_size), timeout - ) + reader = asyncio.create_task(self.ffmpeg.stdout.read(self._chunk_size)) + data = await asyncio.wait_for(reader, timeout) self._out_queue.put(data) except asyncio.TimeoutError: - self._out_queue.put(b'') + pass + except Exception as e: + self.logger.warning('Audio proxy error: %s', e) + break + + self._out_queue.put(b'') def write(self, data: bytes): """ diff --git a/platypush/plugins/sound/_streams/_base.py b/platypush/plugins/sound/_streams/_base.py index fa2d95e99..7a55eb2c2 100644 --- a/platypush/plugins/sound/_streams/_base.py +++ b/platypush/plugins/sound/_streams/_base.py @@ -324,7 +324,7 @@ class AudioThread(Thread, ABC): """ self.logger.debug('Timeout on converter %s', converter.__class__.__name__) # Continue only if the converter hasn't terminated - return self._converter_terminated.is_set() + return not self._converter_terminated.is_set() @override def run(self):