From 33d4c8342d9165e704b89e5be39668f748eec09a Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 15 Apr 2024 23:01:10 +0200 Subject: [PATCH 1/4] [#389] Possible fix for "Too many open files" media issue. It seems that the process keeps a lot of open connections to Chromecast devices during playback. The most likely culprit is the `_refresh_chromecasts` logic. We should start a `cast` object and register a status listener only if a Chromecast with the same identifier isn't already registered in the plugin. --- platypush/plugins/media/chromecast/__init__.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/platypush/plugins/media/chromecast/__init__.py b/platypush/plugins/media/chromecast/__init__.py index 5dc0db73..9afb4357 100644 --- a/platypush/plugins/media/chromecast/__init__.py +++ b/platypush/plugins/media/chromecast/__init__.py @@ -105,7 +105,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): :param callback: If blocking is false, then you can provide a callback function that will be invoked when a new device is discovered """ - self.chromecasts = { + casts = { self._get_device_property(cast, 'friendly_name'): cast for cast in self._get_chromecasts( tries=tries, @@ -116,11 +116,18 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): ) } - for name, cast in self.chromecasts.items(): - self._update_listeners(name, cast) + for name, cast in casts.copy().items(): + if not self.chromecasts.get(name): + self.logger.info('Discovered new Chromecast: %s', name) + self.chromecasts[name] = cast + self._update_listeners(name, cast) + cast.wait() + else: + casts.pop(name) - for cast in self.chromecasts.values(): + for name, cast in casts.items(): cast.wait() + self.logger.info('Refreshed Chromecast state: %s', name) def _event_callback(self, _, cast: pychromecast.Chromecast): with self._refresh_lock: @@ -133,6 +140,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): name=name, cast=cast, callback=self._event_callback ) cast.media_controller.register_status_listener(self._media_listeners[name]) + self.logger.debug('Started media listener for %s', name) def get_chromecast(self, chromecast=None, n_tries=2): if isinstance(chromecast, pychromecast.Chromecast): From 4972c8bdcf38af40de6b561b30f6e29040c59504 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 16 Apr 2024 00:12:55 +0200 Subject: [PATCH 2/4] Unregister a Zeroconf instance if it already exists before publishing a backend service. `mdns` connection are another culprit for the increasing number of open files in the process. --- platypush/backend/__init__.py | 7 +++++++ platypush/plugins/zeroconf/__init__.py | 8 ++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 4835e154..4f6ec73f 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -402,6 +402,13 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): ) return + if self.zeroconf: + self.logger.info( + 'Zeroconf service already registered for %s, removing the previous instance', + self.__class__.__name__, + ) + self.unregister_service() + self.zeroconf = Zeroconf() srv_desc = { 'name': 'Platypush', diff --git a/platypush/plugins/zeroconf/__init__.py b/platypush/plugins/zeroconf/__init__.py index 51275cf1..be2c1879 100644 --- a/platypush/plugins/zeroconf/__init__.py +++ b/platypush/plugins/zeroconf/__init__.py @@ -46,9 +46,7 @@ class ZeroconfListener(ServiceListener): 'properties': { k.decode() if isinstance(k, bytes) - else k: v.decode() - if isinstance(v, bytes) - else v + else k: (v.decode() if isinstance(v, bytes) else v) for k, v in info.properties.items() }, 'server': info.server, @@ -166,9 +164,7 @@ class ZeroconfPlugin(Plugin): get_bus().post(evt) except queue.Empty: if not services: - self.logger.warning( - 'No such service discovered: {}'.format(service) - ) + self.logger.warning('No such service discovered: %s', service) finally: if browser: browser.cancel() From f99f6bdab959fae20606fcb2dc45aa2240e78e69 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 17 Apr 2024 02:49:31 +0200 Subject: [PATCH 3/4] [media.chromecast] Resource clean up + new API adaptations. - `pychromecast.get_chromecasts` returns both a list of devices and a browser object. Since the Chromecast plugin is the most likely culprit of the excessive number of open MDNS sockets, it seems that we may need to explicitly stop discovery on the browser and close the ZeroConf object after the discovery is done. - I was still using an ancient version of pychromecast on my RPi4, and I didn't notice that more recent versions implemented several breaking changes. Adapted the code to cope with those changes. --- .../plugins/media/chromecast/__init__.py | 80 ++++++++++--------- .../plugins/media/chromecast/_subtitles.py | 16 +++- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/platypush/plugins/media/chromecast/__init__.py b/platypush/plugins/media/chromecast/__init__.py index 9afb4357..fcf1a446 100644 --- a/platypush/plugins/media/chromecast/__init__.py +++ b/platypush/plugins/media/chromecast/__init__.py @@ -31,7 +31,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): :param poll_interval: How often the plugin should poll for new/removed Chromecast devices (default: 30 seconds). """ - super().__init__(poll_interval=poll_interval, **kwargs) self._is_local = False @@ -42,11 +41,18 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): def _get_chromecasts(self, *args, **kwargs): with self._refresh_lock: - chromecasts = pychromecast.get_chromecasts(*args, **kwargs) + ret = pychromecast.get_chromecasts(*args, **kwargs) - if isinstance(chromecasts, tuple): - return chromecasts[0] - return chromecasts + if isinstance(ret, tuple): + chromecasts, browser = ret + if browser: + browser.stop_discovery() + if browser.zc: + browser.zc.close() + + return chromecasts + + return ret @staticmethod def _get_device_property(cc, prop: str): @@ -58,14 +64,25 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): """ Convert a Chromecast object and its status to a dictionary. """ + if hasattr(cc, 'cast_info'): # Newer PyChromecast API + host = cc.cast_info.host + port = cc.cast_info.port + elif hasattr(cc, 'host'): + host = getattr(cc, 'host', None) + port = getattr(cc, 'port', None) + elif hasattr(cc, 'uri'): + host, port = cc.uri.split(':') + else: + raise RuntimeError('Invalid Chromecast object') + return { 'type': cc.cast_type, 'name': cc.name, 'manufacturer': self._get_device_property(cc, 'manufacturer'), 'model_name': cc.model_name, 'uuid': str(cc.uuid), - 'address': cc.host if hasattr(cc, 'host') else cc.uri.split(':')[0], - 'port': cc.port if hasattr(cc, 'port') else int(cc.uri.split(':')[1]), + 'address': host, + 'port': port, 'status': ( { 'app': { @@ -284,24 +301,23 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): chromecast = chromecast or self.chromecast cast = self.get_chromecast(chromecast) - if cast.media_controller.is_paused: + if cast.media_controller.status.player_is_paused: cast.media_controller.play() - elif cast.media_controller.is_playing: + elif cast.media_controller.status.player_is_playing: cast.media_controller.pause() cast.wait() return self.status(chromecast=chromecast) @action - def stop(self, *_, chromecast: Optional[str] = None, **__): + def stop(self, *_, chromecast: Optional[str] = None, **__): # type: ignore chromecast = chromecast or self.chromecast if not chromecast: - return + return None cast = self.get_chromecast(chromecast) cast.media_controller.stop() cast.wait() - return self.status(chromecast=chromecast) @action @@ -347,51 +363,51 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): def is_playing(self, chromecast: Optional[str] = None, **_): return self.get_chromecast( chromecast or self.chromecast - ).media_controller.is_playing + ).media_controller.status.player_is_playing @action def is_paused(self, chromecast: Optional[str] = None, **_): return self.get_chromecast( chromecast or self.chromecast - ).media_controller.is_paused + ).media_controller.status.player_is_paused @action def is_idle(self, chromecast: Optional[str] = None): return self.get_chromecast( chromecast or self.chromecast - ).media_controller.is_idle + ).media_controller.status.player_is_idle @action def list_subtitles(self, chromecast: Optional[str] = None): return self.get_chromecast( chromecast or self.chromecast - ).media_controller.subtitle_tracks + ).media_controller.status.subtitle_tracks @action def enable_subtitles( - self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_ + self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_ ): mc = self.get_chromecast(chromecast or self.chromecast).media_controller if track_id is not None: return mc.enable_subtitle(track_id) - if mc.subtitle_tracks: - return mc.enable_subtitle(mc.subtitle_tracks[0].get('trackId')) + if mc.status.subtitle_tracks: + return mc.enable_subtitle(mc.status.subtitle_tracks[0].get('trackId')) @action def disable_subtitles( - self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_ + self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_ ): mc = self.get_chromecast(chromecast or self.chromecast).media_controller if track_id: return mc.disable_subtitle(track_id) - if mc.current_subtitle_tracks: - return mc.disable_subtitle(mc.current_subtitle_tracks[0]) + if mc.status.current_subtitle_tracks: + return mc.disable_subtitle(mc.status.current_subtitle_tracks[0]) @action def toggle_subtitles(self, chromecast: Optional[str] = None, **_): mc = self.get_chromecast(chromecast or self.chromecast).media_controller all_subs = mc.status.subtitle_tracks - cur_subs = mc.status.status.current_subtitle_tracks + cur_subs = mc.status.current_subtitle_tracks if cur_subs: return self.disable_subtitles(chromecast, cur_subs[0]) @@ -511,7 +527,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): self, chromecast: Optional[str] = None, timeout: Optional[float] = None, - blocking: bool = True, ): """ Disconnect a Chromecast and wait for it to terminate @@ -520,11 +535,9 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): the default configured Chromecast will be used. :param timeout: Number of seconds to wait for disconnection (default: None: block until termination). - :param blocking: If set (default), then the code will wait until - disconnection, otherwise it will return immediately. """ cast = self.get_chromecast(chromecast) - cast.disconnect(timeout=timeout, blocking=blocking) + cast.disconnect(timeout=timeout) @action def join(self, chromecast: Optional[str] = None, timeout: Optional[float] = None): @@ -550,17 +563,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): cast = self.get_chromecast(chromecast) cast.quit_app() - @action - def reboot(self, chromecast: Optional[str] = None): - """ - Reboots the Chromecast - - :param chromecast: Chromecast to cast to. If none is specified, then - the default configured Chromecast will be used. - """ - cast = self.get_chromecast(chromecast) - cast.reboot() - @action def set_volume(self, volume: float, chromecast: Optional[str] = None): """ @@ -621,7 +623,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): """ chromecast = chromecast or self.chromecast cast = self.get_chromecast(chromecast) - cast.set_volume_muted(not cast.status.volume_muted) + cast.set_volume_muted(not cast.media_controller.status.volume_muted) cast.wait() return self.status(chromecast=chromecast) diff --git a/platypush/plugins/media/chromecast/_subtitles.py b/platypush/plugins/media/chromecast/_subtitles.py index 450e7a26..ca3b2c49 100644 --- a/platypush/plugins/media/chromecast/_subtitles.py +++ b/platypush/plugins/media/chromecast/_subtitles.py @@ -1,5 +1,9 @@ -# pylint: disable=too-few-public-methods -class SubtitlesAsyncHandler: +import logging + +from pychromecast.controllers.media import MediaStatusListener + + +class SubtitlesAsyncHandler(MediaStatusListener): """ This class is used to enable subtitles when the media is loaded. """ @@ -8,9 +12,17 @@ class SubtitlesAsyncHandler: self.mc = mc self.subtitle_id = subtitle_id self.initialized = False + self.logger = logging.getLogger(__name__) def new_media_status(self, *_): if self.subtitle_id and not self.initialized: self.mc.update_status() self.mc.enable_subtitle(self.subtitle_id) self.initialized = True + + def load_media_failed(self, queue_item_id: int, error_code: int) -> None: + self.logger.warning( + "Failed to load media with queue_item_id %d, error code: %d", + queue_item_id, + error_code, + ) From e1234638045d3a983363935e0b41977c2abbffc1 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 17 Apr 2024 03:56:45 +0200 Subject: [PATCH 4/4] [media.chromecast] Refactored implementation. Explicitly use a `CastBrowser` object initialized at plugin boot instead of relying on blocking calls to `pychromecast.get_chromecasts`. 1. It enables better event handling via callbacks instead of synchronously waiting for scan batches. 2. It optimizes resources - only one Zeroconf and one CastBrowser object will be created in the plugin, and destroyed upon stop. 3. No need for separate `get_chromecast`/`_refresh_chromecasts` methods: all the scanning is run continuously, so we can just return the results from the maps. --- .../plugins/media/chromecast/__init__.py | 216 +++++++++--------- 1 file changed, 106 insertions(+), 110 deletions(-) diff --git a/platypush/plugins/media/chromecast/__init__.py b/platypush/plugins/media/chromecast/__init__.py index fcf1a446..8c972707 100644 --- a/platypush/plugins/media/chromecast/__init__.py +++ b/platypush/plugins/media/chromecast/__init__.py @@ -1,7 +1,12 @@ -import threading -from typing import Callable, Optional +from typing import Optional -import pychromecast # type: ignore +from pychromecast import ( + CastBrowser, + Chromecast, + ChromecastConnectionError, + SimpleCastListener, + get_chromecast_from_cast_info, +) from platypush.backend.http.app.utils import get_remote_base_url from platypush.plugins import RunnablePlugin, action @@ -35,24 +40,34 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): self._is_local = False self.chromecast = chromecast - self.chromecasts = {} + self._chromecasts_by_uuid = {} + self._chromecasts_by_name = {} self._media_listeners = {} - self._refresh_lock = threading.RLock() + self._zc = None + self._browser = None - def _get_chromecasts(self, *args, **kwargs): - with self._refresh_lock: - ret = pychromecast.get_chromecasts(*args, **kwargs) + @property + def zc(self): + from zeroconf import Zeroconf - if isinstance(ret, tuple): - chromecasts, browser = ret - if browser: - browser.stop_discovery() - if browser.zc: - browser.zc.close() + if not self._zc: + self._zc = Zeroconf() - return chromecasts + return self._zc - return ret + @property + def browser(self): + if not self._browser: + self._browser = CastBrowser( + SimpleCastListener(self._on_chromecast_discovered), self.zc + ) + + self._browser.start_discovery() + + return self._browser + + def _on_chromecast_discovered(self, _, service: str): + self.logger.info('Discovered Chromecast: %s', service) @staticmethod def _get_device_property(cc, prop: str): @@ -60,7 +75,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): return getattr(cc.device, prop) return getattr(cc.cast_info, prop) - def _serialize_device(self, cc: pychromecast.Chromecast) -> dict: + def _serialize_device(self, cc: Chromecast) -> dict: """ Convert a Chromecast object and its status to a dictionary. """ @@ -102,101 +117,23 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): ), } - def _refresh_chromecasts( - self, - tries: int = 2, - retry_wait: float = 10, - timeout: float = 60, - blocking: bool = True, - callback: Optional[Callable] = None, - ): - """ - Get the list of Chromecast devices + def _event_callback(self, _, cast: Chromecast): + self._chromecasts_by_uuid[cast.uuid] = cast + self._chromecasts_by_name[ + self._get_device_property(cast, 'friendly_name') + ] = cast - :param tries: Number of retries (default: 2) - :param retry_wait: Number of seconds between retries (default: 10 seconds) - :param timeout: Timeout before failing the call (default: 60 seconds) - :param blocking: If true, then the function will block until all the - Chromecast devices have been scanned. If false, then the provided - callback function will be invoked when a new device is discovered - :param callback: If blocking is false, then you can provide a callback - function that will be invoked when a new device is discovered - """ - casts = { - self._get_device_property(cast, 'friendly_name'): cast - for cast in self._get_chromecasts( - tries=tries, - retry_wait=retry_wait, - timeout=timeout, - blocking=blocking, - callback=callback, - ) - } - - for name, cast in casts.copy().items(): - if not self.chromecasts.get(name): - self.logger.info('Discovered new Chromecast: %s', name) - self.chromecasts[name] = cast - self._update_listeners(name, cast) - cast.wait() - else: - casts.pop(name) - - for name, cast in casts.items(): - cast.wait() - self.logger.info('Refreshed Chromecast state: %s', name) - - def _event_callback(self, _, cast: pychromecast.Chromecast): - with self._refresh_lock: - self.chromecasts[self._get_device_property(cast, 'friendly_name')] = cast - - def _update_listeners(self, name, cast): - if name not in self._media_listeners: - cast.start() - self._media_listeners[name] = MediaListener( - name=name, cast=cast, callback=self._event_callback - ) - cast.media_controller.register_status_listener(self._media_listeners[name]) - self.logger.debug('Started media listener for %s', name) - - def get_chromecast(self, chromecast=None, n_tries=2): - if isinstance(chromecast, pychromecast.Chromecast): - assert chromecast, 'Invalid Chromecast object' + def get_chromecast(self, chromecast=None): + if isinstance(chromecast, Chromecast): return chromecast - if not chromecast: - if not self.chromecast: - raise RuntimeError( - 'No Chromecast specified nor default Chromecast configured' - ) - chromecast = self.chromecast + if self._chromecasts_by_uuid.get(chromecast): + return self._chromecasts_by_uuid[chromecast] - if chromecast not in self.chromecasts: - casts = {} - while n_tries > 0: - n_tries -= 1 - casts.update( - { - self._get_device_property(cast, 'friendly_name'): cast - for cast in self._get_chromecasts() - } - ) + if self._chromecasts_by_name.get(chromecast): + return self._chromecasts_by_name[chromecast] - if chromecast in casts: - self.chromecasts.update(casts) - break - - if chromecast not in self.chromecasts: - raise RuntimeError(f'Device {chromecast} not found') - - cast = self.chromecasts[chromecast] - - try: - cast.wait() - except Exception as e: - self.logger.warning('Failed to wait Chromecast sync: %s', e) - - return cast + raise AssertionError(f'Chromecast {chromecast} not found') @action def play( @@ -311,6 +248,17 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): @action def stop(self, *_, chromecast: Optional[str] = None, **__): # type: ignore + if self.should_stop(): + if self._zc: + self._zc.close() + self._zc = None + + if self._browser: + self._browser.stop_discovery() + self._browser = None + + return + chromecast = chromecast or self.chromecast if not chromecast: return None @@ -513,13 +461,13 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): def _status(self, chromecast: Optional[str] = None) -> dict: if chromecast: assert ( - chromecast in self.chromecasts + chromecast in self._chromecasts_by_name ), f'No such Chromecast device: {chromecast}' - return self._serialize_device(self.chromecasts[chromecast]) + return self._serialize_device(self._chromecasts_by_name[chromecast]) return { name: self._serialize_device(cast) - for name, cast in self.chromecasts.items() + for name, cast in self._chromecasts_by_name.items() } @action @@ -633,6 +581,54 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin): def remove_subtitles(self, *_, **__): raise NotImplementedError + def _refresh_chromecasts(self): + cast_info = {cast.friendly_name: cast for cast in self.browser.devices.values()} + + for info in cast_info.values(): + name = info.friendly_name + if self._chromecasts_by_uuid.get( + info.uuid + ) and self._chromecasts_by_name.get(name): + self.logger.debug('Chromecast %s already connected', name) + continue + + self.logger.info('Started scan for Chromecast %s', name) + + try: + cc = get_chromecast_from_cast_info( + info, + self.browser.zc, + tries=2, + retry_wait=5, + timeout=30, + ) + + self._chromecasts_by_name[cc.name] = cc + except ChromecastConnectionError: + self.logger.warning('Failed to connect to Chromecast %s', info) + continue + + if cc.uuid not in self._chromecasts_by_uuid: + self._chromecasts_by_uuid[cc.uuid] = cc + self.logger.debug('Connecting to Chromecast %s', name) + + if name not in self._media_listeners: + cc.start() + self._media_listeners[name] = MediaListener( + name=name or str(cc.uuid), + cast=cc, + callback=self._event_callback, + ) + + cc.media_controller.register_status_listener( + self._media_listeners[name] + ) + + self.logger.info('Connected to Chromecast %s', name) + + self._chromecasts_by_uuid[cc.uuid] = cc + self._chromecasts_by_name[name] = cc + def main(self): while not self.should_stop(): try: