Compare commits

...

5 Commits

Author SHA1 Message Date
Fabio Manganiello 51a9b956bf Merge pull request '[#389] Fix for "Too many open files" media issue.' (#390) from fix-too-many-open-files-during-media into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #390
2024-04-17 04:08:10 +02:00
Fabio Manganiello e123463804
[media.chromecast] Refactored implementation.
continuous-integration/drone/push Build is passing Details
Explicitly use a `CastBrowser` object initialized at plugin boot instead
of relying on blocking calls to `pychromecast.get_chromecasts`.

1. It enables better event handling via callbacks instead of
   synchronously waiting for scan batches.

2. It optimizes resources - only one Zeroconf and one CastBrowser object
   will be created in the plugin, and destroyed upon stop.

3. No need for separate `get_chromecast`/`_refresh_chromecasts` methods:
   all the scanning is run continuously, so we can just return the
   results from the maps.
2024-04-17 03:56:45 +02:00
Fabio Manganiello f99f6bdab9
[media.chromecast] Resource clean up + new API adaptations.
continuous-integration/drone/push Build is passing Details
- `pychromecast.get_chromecasts` returns both a list of devices and a
  browser object. Since the Chromecast plugin is the most likely culprit
  of the excessive number of open MDNS sockets, it seems that we may
  need to explicitly stop discovery on the browser and close the
  ZeroConf object after the discovery is done.

- I was still using an ancient version of pychromecast on my RPi4, and I
  didn't notice that more recent versions implemented several breaking
  changes. Adapted the code to cope with those changes.
2024-04-17 02:49:31 +02:00
Fabio Manganiello 4972c8bdcf
Unregister a Zeroconf instance if it already exists before publishing a backend service.
continuous-integration/drone/push Build is passing Details
`mdns` connection are another culprit for the increasing number of open
files in the process.
2024-04-16 00:12:55 +02:00
Fabio Manganiello 33d4c8342d
[#389] Possible fix for "Too many open files" media issue.
continuous-integration/drone/push Build is passing Details
It seems that the process keeps a lot of open connections to Chromecast
devices during playback.

The most likely culprit is the `_refresh_chromecasts` logic.

We should start a `cast` object and register a status listener only if a
Chromecast with the same identifier isn't already registered in the
plugin.
2024-04-15 23:01:10 +02:00
4 changed files with 161 additions and 140 deletions

View File

@ -402,6 +402,13 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
)
return
if self.zeroconf:
self.logger.info(
'Zeroconf service already registered for %s, removing the previous instance',
self.__class__.__name__,
)
self.unregister_service()
self.zeroconf = Zeroconf()
srv_desc = {
'name': 'Platypush',

View File

@ -1,7 +1,12 @@
import threading
from typing import Callable, Optional
from typing import Optional
import pychromecast # type: ignore
from pychromecast import (
CastBrowser,
Chromecast,
ChromecastConnectionError,
SimpleCastListener,
get_chromecast_from_cast_info,
)
from platypush.backend.http.app.utils import get_remote_base_url
from platypush.plugins import RunnablePlugin, action
@ -31,22 +36,38 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
:param poll_interval: How often the plugin should poll for new/removed
Chromecast devices (default: 30 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._is_local = False
self.chromecast = chromecast
self.chromecasts = {}
self._chromecasts_by_uuid = {}
self._chromecasts_by_name = {}
self._media_listeners = {}
self._refresh_lock = threading.RLock()
self._zc = None
self._browser = None
def _get_chromecasts(self, *args, **kwargs):
with self._refresh_lock:
chromecasts = pychromecast.get_chromecasts(*args, **kwargs)
@property
def zc(self):
from zeroconf import Zeroconf
if isinstance(chromecasts, tuple):
return chromecasts[0]
return chromecasts
if not self._zc:
self._zc = Zeroconf()
return self._zc
@property
def browser(self):
if not self._browser:
self._browser = CastBrowser(
SimpleCastListener(self._on_chromecast_discovered), self.zc
)
self._browser.start_discovery()
return self._browser
def _on_chromecast_discovered(self, _, service: str):
self.logger.info('Discovered Chromecast: %s', service)
@staticmethod
def _get_device_property(cc, prop: str):
@ -54,18 +75,29 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
return getattr(cc.device, prop)
return getattr(cc.cast_info, prop)
def _serialize_device(self, cc: pychromecast.Chromecast) -> dict:
def _serialize_device(self, cc: Chromecast) -> dict:
"""
Convert a Chromecast object and its status to a dictionary.
"""
if hasattr(cc, 'cast_info'): # Newer PyChromecast API
host = cc.cast_info.host
port = cc.cast_info.port
elif hasattr(cc, 'host'):
host = getattr(cc, 'host', None)
port = getattr(cc, 'port', None)
elif hasattr(cc, 'uri'):
host, port = cc.uri.split(':')
else:
raise RuntimeError('Invalid Chromecast object')
return {
'type': cc.cast_type,
'name': cc.name,
'manufacturer': self._get_device_property(cc, 'manufacturer'),
'model_name': cc.model_name,
'uuid': str(cc.uuid),
'address': cc.host if hasattr(cc, 'host') else cc.uri.split(':')[0],
'port': cc.port if hasattr(cc, 'port') else int(cc.uri.split(':')[1]),
'address': host,
'port': port,
'status': (
{
'app': {
@ -85,93 +117,23 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
),
}
def _refresh_chromecasts(
self,
tries: int = 2,
retry_wait: float = 10,
timeout: float = 60,
blocking: bool = True,
callback: Optional[Callable] = None,
):
"""
Get the list of Chromecast devices
def _event_callback(self, _, cast: Chromecast):
self._chromecasts_by_uuid[cast.uuid] = cast
self._chromecasts_by_name[
self._get_device_property(cast, 'friendly_name')
] = cast
:param tries: Number of retries (default: 2)
:param retry_wait: Number of seconds between retries (default: 10 seconds)
:param timeout: Timeout before failing the call (default: 60 seconds)
:param blocking: If true, then the function will block until all the
Chromecast devices have been scanned. If false, then the provided
callback function will be invoked when a new device is discovered
:param callback: If blocking is false, then you can provide a callback
function that will be invoked when a new device is discovered
"""
self.chromecasts = {
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts(
tries=tries,
retry_wait=retry_wait,
timeout=timeout,
blocking=blocking,
callback=callback,
)
}
for name, cast in self.chromecasts.items():
self._update_listeners(name, cast)
for cast in self.chromecasts.values():
cast.wait()
def _event_callback(self, _, cast: pychromecast.Chromecast):
with self._refresh_lock:
self.chromecasts[self._get_device_property(cast, 'friendly_name')] = cast
def _update_listeners(self, name, cast):
if name not in self._media_listeners:
cast.start()
self._media_listeners[name] = MediaListener(
name=name, cast=cast, callback=self._event_callback
)
cast.media_controller.register_status_listener(self._media_listeners[name])
def get_chromecast(self, chromecast=None, n_tries=2):
if isinstance(chromecast, pychromecast.Chromecast):
assert chromecast, 'Invalid Chromecast object'
def get_chromecast(self, chromecast=None):
if isinstance(chromecast, Chromecast):
return chromecast
if not chromecast:
if not self.chromecast:
raise RuntimeError(
'No Chromecast specified nor default Chromecast configured'
)
chromecast = self.chromecast
if self._chromecasts_by_uuid.get(chromecast):
return self._chromecasts_by_uuid[chromecast]
if chromecast not in self.chromecasts:
casts = {}
while n_tries > 0:
n_tries -= 1
casts.update(
{
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts()
}
)
if self._chromecasts_by_name.get(chromecast):
return self._chromecasts_by_name[chromecast]
if chromecast in casts:
self.chromecasts.update(casts)
break
if chromecast not in self.chromecasts:
raise RuntimeError(f'Device {chromecast} not found')
cast = self.chromecasts[chromecast]
try:
cast.wait()
except Exception as e:
self.logger.warning('Failed to wait Chromecast sync: %s', e)
return cast
raise AssertionError(f'Chromecast {chromecast} not found')
@action
def play(
@ -276,24 +238,34 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
if cast.media_controller.is_paused:
if cast.media_controller.status.player_is_paused:
cast.media_controller.play()
elif cast.media_controller.is_playing:
elif cast.media_controller.status.player_is_playing:
cast.media_controller.pause()
cast.wait()
return self.status(chromecast=chromecast)
@action
def stop(self, *_, chromecast: Optional[str] = None, **__):
def stop(self, *_, chromecast: Optional[str] = None, **__): # type: ignore
if self.should_stop():
if self._zc:
self._zc.close()
self._zc = None
if self._browser:
self._browser.stop_discovery()
self._browser = None
return
chromecast = chromecast or self.chromecast
if not chromecast:
return
return None
cast = self.get_chromecast(chromecast)
cast.media_controller.stop()
cast.wait()
return self.status(chromecast=chromecast)
@action
@ -339,51 +311,51 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def is_playing(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_playing
).media_controller.status.player_is_playing
@action
def is_paused(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_paused
).media_controller.status.player_is_paused
@action
def is_idle(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_idle
).media_controller.status.player_is_idle
@action
def list_subtitles(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.subtitle_tracks
).media_controller.status.subtitle_tracks
@action
def enable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id is not None:
return mc.enable_subtitle(track_id)
if mc.subtitle_tracks:
return mc.enable_subtitle(mc.subtitle_tracks[0].get('trackId'))
if mc.status.subtitle_tracks:
return mc.enable_subtitle(mc.status.subtitle_tracks[0].get('trackId'))
@action
def disable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id:
return mc.disable_subtitle(track_id)
if mc.current_subtitle_tracks:
return mc.disable_subtitle(mc.current_subtitle_tracks[0])
if mc.status.current_subtitle_tracks:
return mc.disable_subtitle(mc.status.current_subtitle_tracks[0])
@action
def toggle_subtitles(self, chromecast: Optional[str] = None, **_):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
all_subs = mc.status.subtitle_tracks
cur_subs = mc.status.status.current_subtitle_tracks
cur_subs = mc.status.current_subtitle_tracks
if cur_subs:
return self.disable_subtitles(chromecast, cur_subs[0])
@ -489,13 +461,13 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def _status(self, chromecast: Optional[str] = None) -> dict:
if chromecast:
assert (
chromecast in self.chromecasts
chromecast in self._chromecasts_by_name
), f'No such Chromecast device: {chromecast}'
return self._serialize_device(self.chromecasts[chromecast])
return self._serialize_device(self._chromecasts_by_name[chromecast])
return {
name: self._serialize_device(cast)
for name, cast in self.chromecasts.items()
for name, cast in self._chromecasts_by_name.items()
}
@action
@ -503,7 +475,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
self,
chromecast: Optional[str] = None,
timeout: Optional[float] = None,
blocking: bool = True,
):
"""
Disconnect a Chromecast and wait for it to terminate
@ -512,11 +483,9 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
the default configured Chromecast will be used.
:param timeout: Number of seconds to wait for disconnection (default:
None: block until termination).
:param blocking: If set (default), then the code will wait until
disconnection, otherwise it will return immediately.
"""
cast = self.get_chromecast(chromecast)
cast.disconnect(timeout=timeout, blocking=blocking)
cast.disconnect(timeout=timeout)
@action
def join(self, chromecast: Optional[str] = None, timeout: Optional[float] = None):
@ -542,17 +511,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
cast = self.get_chromecast(chromecast)
cast.quit_app()
@action
def reboot(self, chromecast: Optional[str] = None):
"""
Reboots the Chromecast
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
cast = self.get_chromecast(chromecast)
cast.reboot()
@action
def set_volume(self, volume: float, chromecast: Optional[str] = None):
"""
@ -613,7 +571,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.set_volume_muted(not cast.status.volume_muted)
cast.set_volume_muted(not cast.media_controller.status.volume_muted)
cast.wait()
return self.status(chromecast=chromecast)
@ -623,6 +581,54 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def remove_subtitles(self, *_, **__):
raise NotImplementedError
def _refresh_chromecasts(self):
cast_info = {cast.friendly_name: cast for cast in self.browser.devices.values()}
for info in cast_info.values():
name = info.friendly_name
if self._chromecasts_by_uuid.get(
info.uuid
) and self._chromecasts_by_name.get(name):
self.logger.debug('Chromecast %s already connected', name)
continue
self.logger.info('Started scan for Chromecast %s', name)
try:
cc = get_chromecast_from_cast_info(
info,
self.browser.zc,
tries=2,
retry_wait=5,
timeout=30,
)
self._chromecasts_by_name[cc.name] = cc
except ChromecastConnectionError:
self.logger.warning('Failed to connect to Chromecast %s', info)
continue
if cc.uuid not in self._chromecasts_by_uuid:
self._chromecasts_by_uuid[cc.uuid] = cc
self.logger.debug('Connecting to Chromecast %s', name)
if name not in self._media_listeners:
cc.start()
self._media_listeners[name] = MediaListener(
name=name or str(cc.uuid),
cast=cc,
callback=self._event_callback,
)
cc.media_controller.register_status_listener(
self._media_listeners[name]
)
self.logger.info('Connected to Chromecast %s', name)
self._chromecasts_by_uuid[cc.uuid] = cc
self._chromecasts_by_name[name] = cc
def main(self):
while not self.should_stop():
try:

View File

@ -1,5 +1,9 @@
# pylint: disable=too-few-public-methods
class SubtitlesAsyncHandler:
import logging
from pychromecast.controllers.media import MediaStatusListener
class SubtitlesAsyncHandler(MediaStatusListener):
"""
This class is used to enable subtitles when the media is loaded.
"""
@ -8,9 +12,17 @@ class SubtitlesAsyncHandler:
self.mc = mc
self.subtitle_id = subtitle_id
self.initialized = False
self.logger = logging.getLogger(__name__)
def new_media_status(self, *_):
if self.subtitle_id and not self.initialized:
self.mc.update_status()
self.mc.enable_subtitle(self.subtitle_id)
self.initialized = True
def load_media_failed(self, queue_item_id: int, error_code: int) -> None:
self.logger.warning(
"Failed to load media with queue_item_id %d, error code: %d",
queue_item_id,
error_code,
)

View File

@ -46,9 +46,7 @@ class ZeroconfListener(ServiceListener):
'properties': {
k.decode()
if isinstance(k, bytes)
else k: v.decode()
if isinstance(v, bytes)
else v
else k: (v.decode() if isinstance(v, bytes) else v)
for k, v in info.properties.items()
},
'server': info.server,
@ -166,9 +164,7 @@ class ZeroconfPlugin(Plugin):
get_bus().post(evt)
except queue.Empty:
if not services:
self.logger.warning(
'No such service discovered: {}'.format(service)
)
self.logger.warning('No such service discovered: %s', service)
finally:
if browser:
browser.cancel()