[`media.chromecast`] Full plugin rewrite.
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Fabio Manganiello 2023-11-12 03:08:54 +01:00
parent 20aeb0b72e
commit 1f321c32dc
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
9 changed files with 517 additions and 373 deletions

View File

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import hashlib
import logging
import os
from typing import Generator, Optional
from platypush.message import JSONAble
@ -56,6 +57,9 @@ class MediaHandler(JSONAble, ABC):
logging.exception(e)
errors[hndl_class.__name__] = str(e)
if os.path.exists(source):
source = f'file://{source}'
raise AttributeError(
f'The source {source} has no handlers associated. Errors: {errors}'
)

View File

@ -18,13 +18,15 @@ export default {
methods: {
async getPlayers() {
const devices = await this.request(`${this.pluginName}.get_chromecasts`)
const devices = Object.values(
await this.request(`${this.pluginName}.status`)
)
return Promise.all(devices.map(async (device) => {
return {
...device,
iconClass: device.type === 'audio' ? 'fa fa-volume-up' : 'fab fa-chromecast',
pluginName: this.pluginName,
status: this.request(`${this.pluginName}.status`, {chromecast: device.name}),
component: this,
}
}))
@ -41,7 +43,9 @@ export default {
},
async status(player) {
return await this.request(`${this.pluginName}.status`, {chromecast: this.getPlayerName(player)})
return (
await this.request(`${this.pluginName}.status`, {chromecast: this.getPlayerName(player)})
)?.status
},
async play(resource, player) {
@ -76,9 +80,8 @@ export default {
return true
},
supports(resource) {
return resource?.type === 'youtube' ||
(resource.url || resource).startsWith('http://') || (resource.url || resource).startsWith('https://')
supports() {
return true
},
},
}

View File

@ -2,10 +2,10 @@ from platypush.message.event import Event
class MediaEvent(Event):
""" Base class for media events """
"""Base class for media events"""
def __init__(self, player=None, plugin=None, *args, **kwargs):
super().__init__(player=player, plugin=plugin, *args, **kwargs)
def __init__(self, player=None, plugin=None, status=None, *args, **kwargs):
super().__init__(player=player, plugin=plugin, status=status, *args, **kwargs)
class MediaPlayRequestEvent(MediaEvent):
@ -13,13 +13,22 @@ class MediaPlayRequestEvent(MediaEvent):
Event triggered when a new media playback request is received
"""
def __init__(self, player=None, plugin=None, resource=None, title=None, *args, **kwargs):
def __init__(
self, player=None, plugin=None, resource=None, title=None, *args, **kwargs
):
"""
:param resource: File name or URI of the played video
:type resource: str
"""
super().__init__(*args, player=player, plugin=plugin, resource=resource, title=title, **kwargs)
super().__init__(
*args,
player=player,
plugin=plugin,
resource=resource,
title=title,
**kwargs
)
class MediaPlayEvent(MediaEvent):
@ -27,13 +36,22 @@ class MediaPlayEvent(MediaEvent):
Event triggered when a new media content is played
"""
def __init__(self, player=None, plugin=None, resource=None, title=None, *args, **kwargs):
def __init__(
self, player=None, plugin=None, resource=None, title=None, *args, **kwargs
):
"""
:param resource: File name or URI of the played video
:type resource: str
"""
super().__init__(*args, player=player, plugin=plugin, resource=resource, title=title, **kwargs)
super().__init__(
*args,
player=player,
plugin=plugin,
resource=resource,
title=title,
**kwargs
)
class MediaStopEvent(MediaEvent):
@ -69,7 +87,9 @@ class MediaSeekEvent(MediaEvent):
"""
def __init__(self, position, player=None, plugin=None, *args, **kwargs):
super().__init__(*args, player=player, plugin=plugin, position=position, **kwargs)
super().__init__(
*args, player=player, plugin=plugin, position=position, **kwargs
)
class MediaVolumeChangedEvent(MediaEvent):
@ -101,7 +121,9 @@ class NewPlayingMediaEvent(MediaEvent):
:type resource: str
"""
super().__init__(*args, player=player, plugin=plugin, resource=resource, **kwargs)
super().__init__(
*args, player=player, plugin=plugin, resource=resource, **kwargs
)
# vim:sw=4:ts=4:et:

View File

@ -326,11 +326,11 @@ class MediaPlugin(Plugin, ABC):
"""
if resource.startswith('file://'):
resource = resource[len('file://') :]
assert os.path.isfile(resource), f'File {resource} not found'
path = resource[len('file://') :]
assert os.path.isfile(path), f'File {path} not found'
self._latest_resource = MediaResource(
resource=resource,
url=f'file://{resource}',
url=resource,
title=os.path.basename(resource),
filename=os.path.basename(resource),
)
@ -635,7 +635,11 @@ class MediaPlugin(Plugin, ABC):
}
"""
return self._start_streaming(media, subtitles=subtitles, download=download)
def _start_streaming(
self, media: str, subtitles: Optional[str] = None, download: bool = False
):
http = get_backend('http')
assert http, f'Unable to stream {media}: HTTP backend not configured'
@ -715,6 +719,7 @@ class MediaPlugin(Plugin, ABC):
@action
def get_youtube_info(self, url):
# Legacy conversion for Mopidy YouTube URIs
m = re.match('youtube:video:(.*)', url)
if m:
url = f'https://www.youtube.com/watch?v={m.group(1)}'
@ -759,7 +764,7 @@ class MediaPlugin(Plugin, ABC):
.pop()
.strip(),
)
.group(1)
.group(1) # type: ignore
.split(':')[::-1]
)
],
@ -797,7 +802,7 @@ class MediaPlugin(Plugin, ABC):
return self._is_local
@staticmethod
def get_subtitles_file(subtitles):
def get_subtitles_file(subtitles: Optional[str] = None):
if not subtitles:
return None

View File

@ -1,156 +1,49 @@
import datetime
import re
import time
import threading
from typing import Callable, Optional
from platypush.context import get_bus
from platypush.plugins import action
import pychromecast # type: ignore
from platypush.backend.http.app.utils import get_remote_base_url
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.media import MediaPlugin
from platypush.utils import get_mime_type
from platypush.message.event.media import (
MediaPlayEvent,
MediaPlayRequestEvent,
MediaStopEvent,
MediaPauseEvent,
NewPlayingMediaEvent,
MediaVolumeChangedEvent,
MediaSeekEvent,
)
from platypush.message.event.media import MediaPlayRequestEvent
from ._listener import MediaListener
from ._subtitles import SubtitlesAsyncHandler
from ._utils import convert_status, post_event
def convert_status(status):
attrs = [
a
for a in dir(status)
if not a.startswith('_') and not callable(getattr(status, a))
]
renamed_attrs = {
'current_time': 'position',
'player_state': 'state',
'supports_seek': 'seekable',
'volume_level': 'volume',
'volume_muted': 'mute',
'content_id': 'url',
}
ret = {}
for attr in attrs:
value = getattr(status, attr)
if attr == 'volume_level':
value *= 100
if attr == 'player_state':
value = value.lower()
if value == 'paused':
value = 'pause'
if value == 'playing':
value = 'play'
if isinstance(value, datetime.datetime):
value = value.isoformat()
if attr in renamed_attrs:
ret[renamed_attrs[attr]] = value
else:
ret[attr] = value
return ret
def post_event(evt_type, **evt):
bus = get_bus()
bus.post(evt_type(player=evt.get('device'), plugin='media.chromecast', **evt))
class MediaChromecastPlugin(MediaPlugin):
class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
"""
Plugin to interact with Chromecast devices
Supported formats:
* HTTP media URLs
* YouTube URLs
* Plex (through ``media.plex`` plugin, experimental)
Plugin to control Chromecast devices.
"""
STREAM_TYPE_UNKNOWN = "UNKNOWN"
STREAM_TYPE_BUFFERED = "BUFFERED"
STREAM_TYPE_LIVE = "LIVE"
class MediaListener:
def __init__(self, name, cast):
self.name = name
self.cast = cast
self.status = convert_status(cast.media_controller.status)
self.last_status_timestamp = time.time()
def new_media_status(self, status):
status = convert_status(status)
if status.get('url') and status.get('url') != self.status.get('url'):
post_event(
NewPlayingMediaEvent,
resource=status['url'],
title=status.get('title'),
device=self.name,
)
if status.get('state') != self.status.get('state'):
if status.get('state') == 'play':
post_event(MediaPlayEvent, resource=status['url'], device=self.name)
elif status.get('state') == 'pause':
post_event(
MediaPauseEvent, resource=status['url'], device=self.name
)
elif status.get('state') in ['stop', 'idle']:
post_event(MediaStopEvent, device=self.name)
if status.get('volume') != self.status.get('volume'):
post_event(
MediaVolumeChangedEvent,
volume=status.get('volume'),
device=self.name,
)
# noinspection PyUnresolvedReferences
if (
abs(status.get('position') - self.status.get('position'))
> time.time() - self.last_status_timestamp + 5
):
post_event(
MediaSeekEvent, position=status.get('position'), device=self.name
)
self.last_status_timestamp = time.time()
self.status = status
class SubtitlesAsyncHandler:
def __init__(self, mc, subtitle_id):
self.mc = mc
self.subtitle_id = subtitle_id
self.initialized = False
# pylint: disable=unused-argument
def new_media_status(self, *_):
if self.subtitle_id and not self.initialized:
self.mc.update_status()
self.mc.enable_subtitle(self.subtitle_id)
self.initialized = True
def __init__(self, chromecast=None, *args, **kwargs):
def __init__(
self, chromecast: Optional[str] = None, poll_interval: float = 30, **kwargs
):
"""
:param chromecast: Default Chromecast to cast to if no name is specified
:type chromecast: str
:param chromecast: Default Chromecast to cast to if no name is specified.
:param poll_interval: How often the plugin should poll for new/removed
Chromecast devices (default: 30 seconds).
"""
super().__init__(*args, **kwargs)
super().__init__(poll_interval=poll_interval, **kwargs)
self._is_local = False
self.chromecast = chromecast
self.chromecasts = {}
self._media_listeners = {}
self._refresh_lock = threading.RLock()
@staticmethod
def _get_chromecasts(*args, **kwargs):
import pychromecast
def _get_chromecasts(self, *args, **kwargs):
with self._refresh_lock:
chromecasts = pychromecast.get_chromecasts(*args, **kwargs)
chromecasts = pychromecast.get_chromecasts(*args, **kwargs)
if isinstance(chromecasts, tuple):
return chromecasts[0]
return chromecasts
@ -161,87 +54,89 @@ class MediaChromecastPlugin(MediaPlugin):
return getattr(cc.device, prop)
return getattr(cc.cast_info, prop)
@action
def get_chromecasts(
self, tries=2, retry_wait=10, timeout=60, blocking=True, callback=None
def _serialize_device(self, cc: pychromecast.Chromecast) -> dict:
"""
Convert a Chromecast object and its status to a dictionary.
"""
return {
'type': cc.cast_type,
'name': cc.name,
'manufacturer': self._get_device_property(cc, 'manufacturer'),
'model_name': cc.model_name,
'uuid': str(cc.uuid),
'address': cc.host if hasattr(cc, 'host') else cc.uri.split(':')[0],
'port': cc.port if hasattr(cc, 'port') else int(cc.uri.split(':')[1]),
'status': (
{
'app': {
'id': cc.app_id,
'name': cc.app_display_name,
},
'is_active_input': cc.status.is_active_input,
'is_stand_by': cc.status.is_stand_by,
'is_idle': cc.is_idle,
'namespaces': cc.status.namespaces,
'volume': round(100 * cc.status.volume_level, 2),
'muted': cc.status.volume_muted,
**convert_status(cc.media_controller.status),
}
if cc.status
else {}
),
}
def _refresh_chromecasts(
self,
tries: int = 2,
retry_wait: float = 10,
timeout: float = 60,
blocking: bool = True,
callback: Optional[Callable] = None,
):
"""
Get the list of Chromecast devices
:param tries: Number of retries (default: 2)
:type tries: int
:param retry_wait: Number of seconds between retries (default: 10 seconds)
:type retry_wait: int
:param timeout: Timeout before failing the call (default: 60 seconds)
:type timeout: int
:param blocking: If true, then the function will block until all the Chromecast
devices have been scanned. If false, then the provided callback function will be
invoked when a new device is discovered
:type blocking: bool
:param callback: If blocking is false, then you can provide a callback function that
will be invoked when a new device is discovered
:type callback: func
:param blocking: If true, then the function will block until all the
Chromecast devices have been scanned. If false, then the provided
callback function will be invoked when a new device is discovered
:param callback: If blocking is false, then you can provide a callback
function that will be invoked when a new device is discovered
"""
self.chromecasts.update(
{
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts(
tries=tries,
retry_wait=retry_wait,
timeout=timeout,
blocking=blocking,
callback=callback,
)
}
)
self.chromecasts = {
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts(
tries=tries,
retry_wait=retry_wait,
timeout=timeout,
blocking=blocking,
callback=callback,
)
}
for name, cast in self.chromecasts.items():
self._update_listeners(name, cast)
return [
{
'type': cc.cast_type,
'name': cc.name,
'manufacturer': self._get_device_property(cc, 'manufacturer'),
'model_name': cc.model_name,
'uuid': str(cc.uuid),
'address': cc.host if hasattr(cc, 'host') else cc.uri.split(':')[0],
'port': cc.port if hasattr(cc, 'port') else int(cc.uri.split(':')[1]),
'status': (
{
'app': {
'id': cc.app_id,
'name': cc.app_display_name,
},
'media': self.status(cc.name).output,
'is_active_input': cc.status.is_active_input,
'is_stand_by': cc.status.is_stand_by,
'is_idle': cc.is_idle,
'namespaces': cc.status.namespaces,
'volume': round(100 * cc.status.volume_level, 2),
'muted': cc.status.volume_muted,
}
if cc.status
else {}
),
}
for cc in self.chromecasts.values()
]
for cast in self.chromecasts.values():
cast.wait()
def _event_callback(self, _, cast: pychromecast.Chromecast):
with self._refresh_lock:
self.chromecasts[self._get_device_property(cast, 'friendly_name')] = cast
def _update_listeners(self, name, cast):
if name not in self._media_listeners:
cast.start()
self._media_listeners[name] = self.MediaListener(name=name, cast=cast)
self._media_listeners[name] = MediaListener(
name=name, cast=cast, callback=self._event_callback
)
cast.media_controller.register_status_listener(self._media_listeners[name])
def get_chromecast(self, chromecast=None, n_tries=2):
import pychromecast
if isinstance(chromecast, pychromecast.Chromecast):
assert chromecast, 'Invalid Chromecast object'
return chromecast
if not chromecast:
@ -267,7 +162,7 @@ class MediaChromecastPlugin(MediaPlugin):
break
if chromecast not in self.chromecasts:
raise RuntimeError('Device {} not found'.format(chromecast))
raise RuntimeError(f'Device {chromecast} not found')
cast = self.chromecasts[chromecast]
cast.wait()
@ -276,99 +171,72 @@ class MediaChromecastPlugin(MediaPlugin):
@action
def play(
self,
resource,
content_type=None,
chromecast=None,
title=None,
image_url=None,
autoplay=True,
current_time=0,
stream_type=STREAM_TYPE_BUFFERED,
subtitles=None,
subtitles_lang='en-US',
subtitles_mime='text/vtt',
subtitle_id=1,
resource: str,
*_,
content_type: Optional[str] = None,
chromecast: Optional[str] = None,
title: Optional[str] = None,
image_url: Optional[str] = None,
autoplay: bool = True,
current_time: int = 0,
stream_type: str = STREAM_TYPE_BUFFERED,
subtitles: Optional[str] = None,
subtitles_lang: str = 'en-US',
subtitles_mime: str = 'text/vtt',
subtitle_id: int = 1,
**__,
):
"""
Cast media to a visible Chromecast
Cast media to an available Chromecast device.
:param resource: Media to cast
:type resource: str
:param content_type: Content type as a MIME type string
:type content_type: str
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
:param title: Optional title
:type title: str
:param image_url: URL of the image to use for the thumbnail
:type image_url: str
:param autoplay: Set it to false if you don't want the content to start playing immediately (default: true)
:type autoplay: bool
:param autoplay: Set it to false if you don't want the content to start
playing immediately (default: true)
:param current_time: Time to start the playback in seconds (default: 0)
:type current_time: int
:param stream_type: Type of stream to cast. Can be BUFFERED (default), LIVE or UNKNOWN
:type stream_type: str
:param stream_type: Type of stream to cast. Can be BUFFERED (default),
LIVE or UNKNOWN
:param subtitles: URL of the subtitles to be shown
:type subtitles: str
:param subtitles_lang: Subtitles language (default: en-US)
:type subtitles_lang: str
:param subtitles_mime: Subtitles MIME type (default: text/vtt)
:type subtitles_mime: str
:param subtitle_id: ID of the subtitles to be loaded (default: 1)
:type subtitle_id: int
"""
from pychromecast.controllers.youtube import YouTubeController
if not chromecast:
chromecast = self.chromecast
post_event(MediaPlayRequestEvent, resource=resource, device=chromecast)
cast = self.get_chromecast(chromecast)
mc = cast.media_controller
yt = self._get_youtube_url(resource)
if yt:
self.logger.info('Playing YouTube video {} on {}'.format(yt, chromecast))
hndl = YouTubeController()
cast.register_handler(hndl)
hndl.update_screen_id()
return hndl.play_video(yt)
resource = self._get_resource(resource)
if not content_type:
content_type = get_mime_type(resource)
if not content_type:
raise RuntimeError(
'content_type required to process media {}'.format(resource)
)
raise RuntimeError(f'content_type required to process media {resource}')
if not resource.startswith('http://') and not resource.startswith('https://'):
resource = self.start_streaming(resource).output['url']
self.logger.info('HTTP media stream started on {}'.format(resource))
resource = self._start_streaming(resource)['url']
resource = get_remote_base_url() + resource
self.logger.info('HTTP media stream started on %s', resource)
self.logger.info('Playing {} on {}'.format(resource, chromecast))
if self._latest_resource:
if not title:
title = self._latest_resource.title
if not image_url:
image_url = self._latest_resource.image
self.logger.info('Playing %s on %s', resource, chromecast)
mc.play_media(
resource,
content_type,
title=title,
title=self._latest_resource.title if self._latest_resource else title,
thumb=image_url,
current_time=current_time,
autoplay=autoplay,
@ -380,33 +248,26 @@ class MediaChromecastPlugin(MediaPlugin):
)
if subtitles:
mc.register_status_listener(self.SubtitlesAsyncHandler(mc, subtitle_id))
mc.register_status_listener(SubtitlesAsyncHandler(mc, subtitle_id))
mc.block_until_active()
if self.volume:
self.set_volume(volume=self.volume, chromecast=chromecast)
return self.status(chromecast=chromecast)
@classmethod
def _get_youtube_url(cls, url):
m = re.match(r'https?://(www\.)?youtube\.com/watch\?v=([^&]+).*', url)
if m:
return m.group(2)
m = re.match('youtube:video:(.*)', url)
if m:
return m.group(1)
return None
@action
def load(self, *args, **kwargs):
"""
Alias for :meth:`.play`.
"""
return self.play(*args, **kwargs)
@action
def pause(self, chromecast=None):
def pause(self, *_, chromecast: Optional[str] = None, **__):
"""
Pause the current media on the Chromecast.
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
@ -419,7 +280,7 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def stop(self, chromecast=None):
def stop(self, *_, chromecast: Optional[str] = None, **__):
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.media_controller.stop()
@ -428,7 +289,7 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def rewind(self, chromecast=None):
def rewind(self, chromecast: Optional[str] = None):
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.media_controller.rewind()
@ -436,18 +297,18 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def set_position(self, position, chromecast=None):
def set_position(self, position: float, chromecast: Optional[str] = None, **_):
cast = self.get_chromecast(chromecast or self.chromecast)
cast.media_controller.seek(position)
cast.wait()
return self.status(chromecast=chromecast)
@action
def seek(self, position, chromecast=None):
def seek(self, position: float, chromecast: Optional[str] = None, **_):
return self.forward(chromecast=chromecast, offset=position)
@action
def back(self, chromecast=None, offset=30):
def back(self, chromecast: Optional[str] = None, offset: int = 30, **_):
cast = self.get_chromecast(chromecast or self.chromecast)
mc = cast.media_controller
if mc.status.current_time:
@ -457,7 +318,7 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def forward(self, chromecast=None, offset=30):
def forward(self, chromecast: Optional[str] = None, offset: int = 30, **_):
cast = self.get_chromecast(chromecast or self.chromecast)
mc = cast.media_controller
if mc.status.current_time:
@ -467,158 +328,248 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def is_playing(self, chromecast=None):
def is_playing(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_playing
@action
def is_paused(self, chromecast=None):
def is_paused(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_paused
@action
def is_idle(self, chromecast=None):
def is_idle(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_idle
@action
def list_subtitles(self, chromecast=None):
def list_subtitles(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.subtitle_tracks
@action
def enable_subtitles(self, chromecast=None, track_id=None):
def enable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id is not None:
return mc.enable_subtitle(track_id)
elif mc.subtitle_tracks:
if mc.subtitle_tracks:
return mc.enable_subtitle(mc.subtitle_tracks[0].get('trackId'))
@action
def disable_subtitles(self, chromecast=None, track_id=None):
def disable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id:
return mc.disable_subtitle(track_id)
elif mc.current_subtitle_tracks:
if mc.current_subtitle_tracks:
return mc.disable_subtitle(mc.current_subtitle_tracks[0])
@action
def toggle_subtitles(self, chromecast=None):
def toggle_subtitles(self, chromecast: Optional[str] = None, **_):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
all_subs = mc.status.subtitle_tracks
cur_subs = mc.status.status.current_subtitle_tracks
if cur_subs:
return self.disable_subtitles(chromecast, cur_subs[0])
else:
return self.enable_subtitles(chromecast, all_subs[0].get('trackId'))
return self.enable_subtitles(chromecast, all_subs[0].get('trackId'))
@action
def status(self, chromecast=None):
cast = self.get_chromecast(chromecast or self.chromecast)
status = cast.media_controller.status
return convert_status(status)
def status(self, chromecast: Optional[str] = None):
"""
:return: The status of a Chromecast (if ``chromecast`` is specified) or
all the discovered/available Chromecasts. Format:
.. code-block:: python
{
"type": "cast", # Can be "cast" or "audio"
"name": "Living Room TV",
"manufacturer": "Google Inc.",
"model_name": "Chromecast",
"uuid": "f812afac-80ff-11ee-84dc-001500e8f607",
"address": "192.168.1.2",
"port": 8009,
"status": {
"app": {
"id": "CC1AD845",
"name": "Default Media Receiver"
},
"is_active_input": false,
"is_stand_by": true,
"is_idle": true,
"namespaces": [
"urn:x-cast:com.google.cast.cac",
"urn:x-cast:com.google.cast.debugoverlay",
"urn:x-cast:com.google.cast.media"
],
"volume": 100,
"muted": false,
"adjusted_current_time": 14.22972,
"album_artist": null,
"album_name": null,
"artist": null,
"url": "https://some/video.mp4",
"content_type": "video/mp4",
"current_subtitle_tracks": [],
"position": 1.411891,
"duration": 253.376145,
"episode": null,
"idle_reason": null,
"images": [
[
"https://some/image.jpg",
null,
null
]
],
"last_updated": "2023-11-12T02:03:33.888843",
"media_custom_data": {},
"media_is_generic": true,
"media_is_movie": false,
"media_is_musictrack": false,
"media_is_photo": false,
"media_is_tvshow": false,
"media_metadata": {
"title": "Some media",
"thumb": "https://some/image.jpg",
"images": [
{
"url": "https://some/image.jpg"
}
],
"metadataType": 0
},
"media_session_id": 1,
"metadata_type": 0,
"playback_rate": 1,
"player_is_idle": false,
"player_is_paused": false,
"player_is_playing": true,
"state": "play",
"season": null,
"series_title": null,
"stream_type": "BUFFERED",
"stream_type_is_buffered": true,
"stream_type_is_live": false,
"subtitle_tracks": [],
"supported_media_commands": 12303,
"supports_pause": true,
"supports_queue_next": false,
"supports_queue_prev": false,
"seekable": true,
"supports_skip_backward": false,
"supports_skip_forward": false,
"supports_stream_mute": true,
"supports_stream_volume": true,
"title": "Some media",
"track": null
}
}
"""
return self._status(chromecast=chromecast)
def _status(self, chromecast: Optional[str] = None) -> dict:
if chromecast:
assert (
chromecast in self.chromecasts
), f'No such Chromecast device: {chromecast}'
return self._serialize_device(self.chromecasts[chromecast])
return {
name: self._serialize_device(cast)
for name, cast in self.chromecasts.items()
}
@action
def disconnect(self, chromecast=None, timeout=None, blocking=True):
def disconnect(
self,
chromecast: Optional[str] = None,
timeout: Optional[float] = None,
blocking: bool = True,
):
"""
Disconnect a Chromecast and wait for it to terminate
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param timeout: Number of seconds to wait for disconnection (default: None: block until termination)
:type timeout: float
:param blocking: If set (default), then the code will wait until disconnection, otherwise it will return
immediately.
:type blocking: bool
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
:param timeout: Number of seconds to wait for disconnection (default:
None: block until termination).
:param blocking: If set (default), then the code will wait until
disconnection, otherwise it will return immediately.
"""
cast = self.get_chromecast(chromecast)
cast.disconnect(timeout=timeout, blocking=blocking)
@action
def join(self, chromecast=None, timeout=None):
def join(self, chromecast: Optional[str] = None, timeout: Optional[float] = None):
"""
Blocks the thread until the Chromecast connection is terminated.
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param timeout: Number of seconds to wait for disconnection (default: None: block until termination)
:type timeout: float
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
:param timeout: Number of seconds to wait for disconnection (default:
None: block until termination).
"""
cast = self.get_chromecast(chromecast)
cast.join(timeout=timeout)
@action
def quit(self, chromecast=None):
def quit(self, chromecast: Optional[str] = None):
"""
Exits the current app on the Chromecast
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
cast = self.get_chromecast(chromecast)
cast.quit_app()
@action
def reboot(self, chromecast=None):
def reboot(self, chromecast: Optional[str] = None):
"""
Reboots the Chromecast
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
cast = self.get_chromecast(chromecast)
cast.reboot()
@action
def set_volume(self, volume, chromecast=None):
def set_volume(self, volume: float, chromecast: Optional[str] = None):
"""
Set the Chromecast volume
:param volume: Volume to be set, between 0 and 100
:type volume: float
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param volume: Volume to be set, between 0 and 100.
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.set_volume(volume / 100)
cast.wait()
status = self.status(chromecast=chromecast)
status.output['volume'] = volume
return status
return {
**self._status(chromecast=chromecast),
'volume': volume,
}
@action
def volup(self, chromecast=None, step=10):
def volup(self, chromecast: Optional[str] = None, step: float = 10, **_):
"""
Turn up the Chromecast volume by 10% or step.
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param step: Volume increment between 0 and 100 (default: 100%)
:type step: float
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
:param step: Volume increment between 0 and 100 (default: 10%).
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
step /= 100
@ -627,18 +578,14 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def voldown(self, chromecast=None, step=10):
def voldown(self, chromecast: Optional[str] = None, step: float = 10, **_):
"""
Turn down the Chromecast volume by 10% or step.
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param step: Volume decrement between 0 and 100 (default: 100%)
:type step: float
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
:param step: Volume decrement between 0 and 100 (default: 10%).
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
step /= 100
@ -647,26 +594,31 @@ class MediaChromecastPlugin(MediaPlugin):
return self.status(chromecast=chromecast)
@action
def mute(self, chromecast=None):
def mute(self, chromecast: Optional[str] = None):
"""
Toggle the mute status on the Chromecast
:param chromecast: Chromecast to cast to. If none is specified, then the default configured Chromecast
will be used.
:type chromecast: str
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.set_volume_muted(not cast.status.volume_muted)
cast.wait()
return self.status(chromecast=chromecast)
def set_subtitles(self, filename, *args, **kwargs):
def set_subtitles(self, *_, **__):
raise NotImplementedError
def remove_subtitles(self, *args, **kwargs):
def remove_subtitles(self, *_, **__):
raise NotImplementedError
def main(self):
while not self.should_stop():
try:
self._refresh_chromecasts()
finally:
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,77 @@
import logging
import time
from typing import Optional
from platypush.message.event.media import (
MediaPlayEvent,
MediaStopEvent,
MediaPauseEvent,
NewPlayingMediaEvent,
MediaVolumeChangedEvent,
MediaSeekEvent,
)
from ._utils import MediaCallback, convert_status, post_event
logger = logging.getLogger(__name__)
class MediaListener:
"""
Listens for media status changes and posts events accordingly.
"""
def __init__(self, name: str, cast, callback: Optional[MediaCallback] = None):
self.name = name
self.cast = cast
self.status = convert_status(cast.media_controller.status)
self.last_status_timestamp = time.time()
self.callback = callback
def new_media_status(self, status):
status = convert_status(status)
if status.get('url') and status.get('url') != self.status.get('url'):
self._post_event(
NewPlayingMediaEvent,
title=status.get('title'),
)
state = status.get('state')
if state != self.status.get('state'):
if state == 'play':
self._post_event(MediaPlayEvent)
elif state == 'pause':
self._post_event(MediaPauseEvent)
elif state in ('stop', 'idle'):
self._post_event(MediaStopEvent)
if status.get('volume') != self.status.get('volume'):
self._post_event(MediaVolumeChangedEvent, volume=status.get('volume'))
if (
status.get('position')
and self.status.get('position')
and abs(status['position'] - self.status['position'])
> time.time() - self.last_status_timestamp + 5
):
self._post_event(MediaSeekEvent, position=status.get('position'))
self.status = status
self.last_status_timestamp = time.time()
def load_media_failed(self, item, error_code):
logger.warning('Failed to load media %s: %d', item, error_code)
def _post_event(self, evt_type, **evt):
status = evt.get('status', {})
resource = status.get('url')
args = {
'device': self.name,
'plugin': 'media.chromecast',
**evt,
}
if resource:
args['resource'] = resource
post_event(evt_type, callback=self.callback, chromecast=self.cast, **args)

View File

@ -0,0 +1,16 @@
# pylint: disable=too-few-public-methods
class SubtitlesAsyncHandler:
"""
This class is used to enable subtitles when the media is loaded.
"""
def __init__(self, mc, subtitle_id):
self.mc = mc
self.subtitle_id = subtitle_id
self.initialized = False
def new_media_status(self, *_):
if self.subtitle_id and not self.initialized:
self.mc.update_status()
self.mc.enable_subtitle(self.subtitle_id)
self.initialized = True

View File

@ -0,0 +1,64 @@
import datetime
from typing import Any, Callable, Optional
import pychromecast # type: ignore
from platypush.context import get_bus
from platypush.message.event.media import MediaEvent
MediaCallback = Callable[[MediaEvent, pychromecast.Chromecast], Optional[Any]]
def convert_status(status) -> dict:
"""
Convert a Chromecast status object to a dictionary.
"""
attrs = [
a
for a in dir(status)
if not a.startswith('_') and not callable(getattr(status, a))
]
renamed_attrs = {
'current_time': 'position',
'player_state': 'state',
'supports_seek': 'seekable',
'volume_level': 'volume',
'volume_muted': 'muted',
'content_id': 'url',
}
ret = {}
for attr in attrs:
value = getattr(status, attr)
if attr == 'volume_level':
value = round(100 * value, 2)
if attr == 'player_state':
value = value.lower()
if value == 'paused':
value = 'pause'
if value == 'playing':
value = 'play'
if isinstance(value, datetime.datetime):
value = value.isoformat()
if attr in renamed_attrs:
ret[renamed_attrs[attr]] = value
else:
ret[attr] = value
return ret
def post_event(
evt_type,
callback: Optional[MediaCallback] = None,
chromecast: Optional[pychromecast.Chromecast] = None,
**evt
):
evt['plugin'] = 'media.chromecast'
event = evt_type(player=evt.get('device'), **evt)
get_bus().post(event)
if callback:
callback(event, chromecast)

View File

@ -82,6 +82,7 @@ mock_imports = [
"pvporcupine ",
"pyHS100",
"pyaudio",
"pychromecast",
"pyclip",
"pydbus",
"pyfirmata2",