From 0bcd3210861547334bd415ce13d2230d070acb9f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 13 Sep 2022 17:55:15 +0200 Subject: [PATCH] wip --- platypush/plugins/music/tidal/__init__.py | 991 ++-------------------- setup.py | 2 +- 2 files changed, 80 insertions(+), 913 deletions(-) diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index 20d889a0..e428470a 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -1,19 +1,14 @@ +import json import os - -from datetime import datetime -from typing import List, Optional, Union, Iterable +import requests from platypush.config import Config from platypush.message.response import Response -from platypush.plugins import action +from platypush.plugins import RunnablePlugin, action from platypush.plugins.media import PlayerState -from platypush.plugins.music import MusicPlugin -from platypush.schemas.spotify import SpotifyDeviceSchema, SpotifyStatusSchema, SpotifyTrackSchema, \ - SpotifyHistoryItemSchema, SpotifyPlaylistSchema, SpotifyAlbumSchema, SpotifyEpisodeSchema, SpotifyShowSchema, \ - SpotifyArtistSchema -class MusicTidalPlugin(MusicPlugin): +class MusicTidalPlugin(RunnablePlugin): """ Plugin to interact with the user's Tidal account and library. @@ -24,19 +19,29 @@ class MusicTidalPlugin(MusicPlugin): """ _base_url = 'https://api.tidalhifi.com/v1/' - _oauth_file = os.path.join( + _default_credentials_file = os.path.join( str(Config.get('workdir')), 'tidal', 'credentials.json' ) - def __init__(self, quality: str = 'high', **kwargs): + def __init__( + self, + quality: str = 'high', + credentials_file: str = _default_credentials_file, + **kwargs + ): """ :param quality: Default audio quality. Default: ``high``. Supported: [``loseless``, ``master``, ``high``, ``low``]. + :param credentials_file: Path to the file where the OAuth session + parameters will be stored (default: + ``/tidal/credentials.json``). """ from tidalapi import Quality super().__init__(self, **kwargs) + self._credentials_file = credentials_file + try: self._quality = getattr(Quality, quality.lower()) except AttributeError: @@ -45,910 +50,72 @@ class MusicTidalPlugin(MusicPlugin): f'{[q.name for q in Quality]}' ) - @staticmethod - def _parse_datetime(dt: Optional[Union[str, datetime, int, float]]) -> Optional[datetime]: - if isinstance(dt, str): - try: - dt = float(dt) - except (ValueError, TypeError): - return datetime.fromisoformat(dt) - - if isinstance(dt, int) or isinstance(dt, float): - return datetime.fromtimestamp(dt) - - return dt - - @action - def get_devices(self) -> List[dict]: - """ - Get the list of players associated to the Spotify account. - - :return: .. schema:: spotify.SpotifyDeviceSchema(many=True) - """ - devices = self.spotify_user_call('/v1/me/player/devices').get('devices', []) - self._players_by_id = { - **self._players_by_id, - **{ - dev['id']: dev - for dev in devices - } - } - - self._players_by_name = { - **self._players_by_name, - **{ - dev['name']: dev - for dev in devices - } - } - - return SpotifyDeviceSchema().dump(devices, many=True) - - @action - def set_volume(self, volume: int, device: Optional[str] = None): - """ - Set the playback volume on a device. - - :param volume: Target volume as a percentage between 0 and 100. - :param device: Device ID or name. If none is specified then the currently active device will be used. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - '/v1/me/player/volume', - method='put', - params={ - 'volume_percent': volume, - **({'device_id': device} if device else {}), - } - ) - - def _get_volume(self, device: Optional[str] = None) -> Optional[int]: - if device: - return self._get_device(device).get('volume') - - return self.status.output.get('volume') - - @action - def volup(self, delta: int = 5, device: Optional[str] = None): - """ - Set the volume up by a certain delta. - - :param delta: Increase the volume by this percentage amount (between 0 and 100). - :param device: Device ID or name. If none is specified then the currently active device will be used. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call('/v1/me/player/volume', params={ - 'volume_percent': min(100, (self._get_volume() or 0) + delta), - **({'device_id': device} if device else {}), - }) - - @action - def voldown(self, delta: int = 5, device: Optional[str] = None): - """ - Set the volume down by a certain delta. - - :param delta: Decrease the volume by this percentage amount (between 0 and 100). - :param device: Device ID or name. If none is specified then the currently active device will be used. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call('/v1/me/player/volume', params={ - 'volume_percent': max(0, (self._get_volume() or 0) - delta), - **({'device_id': device} if device else {}), - }) - - @action - def play(self, resource: Optional[str] = None, device: Optional[str] = None): - """ - Change the playback state of a device to ``PLAY`` or start playing a specific resource. - - :param resource: Resource to play, in Spotify URI format (e.g. ``spotify:track:xxxxxxxxxxxxxxxxxxxxxx``). - If none is specified then the method will change the playback state to ``PLAY``. - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - '/v1/me/player/play', - method='put', - json={'uris': [resource]} if resource else {}, - params={ - **({'device_id': device} if device else {}), - }, - ) - - @action - def pause(self, device: Optional[str] = None): - """ - Toggle paused state. - - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - # noinspection PyUnresolvedReferences - status = self.status().output - state = 'play' \ - if status.get('device_id') != device or status.get('state') != PlayerState.PLAY.value else 'pause' - - self.spotify_user_call( - f'/v1/me/player/{state}', - method='put', - params={ - **({'device_id': device} if device else {}), - }, - ) - - @action - def pause_if_playing(self): - """ - Pause playback only if it's playing - """ - # noinspection PyUnresolvedReferences - status = self.status().output - if status.get('state') == PlayerState.PLAY.value: - self.spotify_user_call( - f'/v1/me/player/pause', - method='put', - ) - - @action - def play_if_paused(self, device: Optional[str] = None): - """ - Play only if it's paused (resume) - - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - # noinspection PyUnresolvedReferences - status = self.status().output - if status.get('state') != PlayerState.PLAY.value: - self.spotify_user_call( - f'/v1/me/player/play', - method='put', - params={ - **({'device_id': device} if device else {}), - }, - ) - - @action - def play_if_paused_or_stopped(self): - """ - Alias for :meth:`.play_if_paused`. - """ - return self.play_if_paused() - - @action - def stop(self, **kwargs): - """ - This method is actually just an alias to :meth:`.stop`, since Spotify manages clearing playback sessions - automatically after a while for paused devices. - """ - return self.pause(**kwargs) - - @action - def start_or_transfer_playback(self, device: str): - """ - Start or transfer playback to the device specified. - - :param device: Device ID or name. - """ - device = self._get_device(device)['id'] - self.spotify_user_call( - f'/v1/me/player', - method='put', - json={ - 'device_ids': [device], - }, - ) - - @action - def next(self, device: Optional[str] = None, **kwargs): - """ - Skip to the next track. - - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - f'/v1/me/player/next', - method='post', - params={ - **({'device_id': device} if device else {}), - }, - ) - - @action - def previous(self, device: Optional[str] = None, **kwargs): - """ - Skip to the next track. - - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - f'/v1/me/player/previous', - method='post', - params={ - **({'device_id': device} if device else {}), - }, - ) - - @action - def seek(self, position: float, device: Optional[str] = None, **kwargs): - """ - Set the cursor to the specified position in the track. - - :param position: Position in seconds. - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - f'/v1/me/player/seek', - method='put', - params={ - 'position_ms': int(position * 1000), - **({'device_id': device} if device else {}), - }, - ) - - @action - def repeat(self, value: Optional[bool] = None, device: Optional[str] = None): - """ - Set or toggle repeat mode. - - :param value: If set, set the repeat state this value (true/false). Default: None (toggle current state). - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - if value is None: - # noinspection PyUnresolvedReferences - status = self.status().output - state = 'context' \ - if status.get('device_id') != device or not status.get('repeat') else 'off' - else: - state = value is True - - self.spotify_user_call( - f'/v1/me/player/repeat', - method='put', - params={ - 'state': 'context' if state else 'off', - **({'device_id': device} if device else {}), - }, - ) - - @action - def random(self, value: Optional[bool] = None, device: Optional[str] = None): - """ - Set or toggle random/shuffle mode. - - :param value: If set, set the shuffle state this value (true/false). Default: None (toggle current state). - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - if value is None: - # noinspection PyUnresolvedReferences - status = self.status().output - state = True if status.get('device_id') != device or not status.get('random') else False - else: - state = value is True - - self.spotify_user_call( - f'/v1/me/player/shuffle', - method='put', - params={ - 'state': state, - **({'device_id': device} if device else {}), - }, - ) - - @action - def history(self, limit: int = 20, before: Optional[Union[datetime, str, int]] = None, - after: Optional[Union[datetime, str, int]] = None): - """ - Get a list of recently played track on the account. - - :param limit: Maximum number of tracks to be retrieved (default: 20, max: 50). - :param before: Retrieve only the tracks played before this timestamp, specified as a UNIX timestamp, a datetime - object or an ISO datetime string. If ``before`` is set then ``after`` cannot be set. - :param after: Retrieve only the tracks played after this timestamp, specified as a UNIX timestamp, a datetime - object or an ISO datetime string. If ``after`` is set then ``before`` cannot be set. - :return: - """ - before = self._parse_datetime(before) - after = self._parse_datetime(after) - assert not (before and after), 'before and after cannot both be set' - - results = self._spotify_paginate_results('/v1/me/player/recently-played', - limit=limit, - params={ - 'limit': min(limit, 50), - **({'before': before} if before else {}), - **({'after': after} if after else {}), - }) - - return SpotifyHistoryItemSchema().dump([ - { - **item.pop('track'), - **item, - } - for item in results - ], many=True) - - @action - def add(self, resource: str, device: Optional[str] = None, **kwargs): - """ - Add a Spotify resource (track, or episode) to the playing queue. - - :param resource: Spotify resource URI. - :param device: Device ID or name. If none is specified then the action will target the currently active device. - """ - if device: - device = self._get_device(device)['id'] - - self.spotify_user_call( - f'/v1/me/player/queue', - method='post', - params={ - 'uri': resource, - **({'device_id': device} if device else {}), - }, - ) - - @action - def clear(self, **kwargs): - pass - - @action - def status(self, **kwargs) -> dict: - """ - Get the status of the currently active player. - - :return: .. schema:: spotify.SpotifyStatusSchema - """ - status = self.spotify_user_call('/v1/me/player') - if not status: - return { - 'state': PlayerState.STOP.value, - } - - return SpotifyStatusSchema().dump(status) - - @action - def current_track(self, **kwargs) -> dict: - """ - Get the track currently playing. - - :return: .. schema:: spotify.SpotifyTrackSchema - """ - status = self.spotify_user_call('/v1/me/player') - empty_response = Response(output={}) - if not status: - # noinspection PyTypeChecker - return empty_response - - track = status.get('item', {}) - if not track: - # noinspection PyTypeChecker - return empty_response - - return SpotifyTrackSchema().dump(track) - - @action - def get_playlists(self, limit: int = 1000, offset: int = 0, user: Optional[str] = None): - """ - Get the user's playlists. - - :param limit: Maximum number of results (default: 1000). - :param offset: Return results starting from this index (default: 0). - :param user: Return the playlist owned by a specific user ID (default: currently logged in user). - :return: .. schema:: spotify.SpotifyPlaylistSchema - """ - playlists = self._spotify_paginate_results( - f'/v1/{"users/" + user if user else "me"}/playlists', - limit=limit, offset=offset - ) - - return SpotifyPlaylistSchema().dump(playlists, many=True) - - def _get_playlist(self, playlist: str) -> dict: - playlists = self.get_playlists().output - playlists = [ - pl for pl in playlists if ( - pl['id'] == playlist or - pl['uri'] == playlist or - pl['name'] == playlist - ) - ] - - assert playlists, f'No such playlist ID, URI or name: {playlist}' - return playlists[0] - - def _get_playlist_tracks_from_cache(self, id: str, snapshot_id: str, limit: Optional[int] = None, - offset: int = 0) -> Optional[Iterable]: - snapshot = self._playlist_snapshots.get(id) - if ( - not snapshot or - snapshot['snapshot_id'] != snapshot_id or - (limit is None and snapshot['limit'] is not None) - ): - return - - if limit is not None and snapshot['limit'] is not None: - stored_range = (snapshot['limit'], snapshot['limit'] + snapshot['offset']) - requested_range = (limit, limit + offset) - if requested_range[0] < stored_range[0] or requested_range[1] > stored_range[1]: - return - - return snapshot['tracks'] - - def _cache_playlist_data(self, id: str, snapshot_id: str, tracks: Iterable[dict], limit: Optional[int] = None, - offset: int = 0, **_): - self._playlist_snapshots[id] = { - 'id': id, - 'tracks': tracks, - 'snapshot_id': snapshot_id, - 'limit': limit, - 'offset': offset, - } - - @action - def get_playlist(self, playlist: str, with_tracks: bool = True, limit: Optional[int] = None, offset: int = 0): - """ - Get a playlist content. - - :param playlist: Playlist name, ID or URI. - :param with_tracks: Return also the playlist tracks (default: false, return only the metadata). - :param limit: If ``with_tracks`` is True, retrieve this maximum amount of tracks - (default: None, get all tracks). - :param offset: If ``with_tracks`` is True, retrieve tracks starting from this index (default: 0). - :return: .. schema:: spotify.SpotifyPlaylistSchema - """ - playlist = self._get_playlist(playlist) - if with_tracks: - playlist['tracks'] = self._get_playlist_tracks_from_cache( - playlist['id'], snapshot_id=playlist['snapshot_id'], - limit=limit, offset=offset - ) - - if playlist['tracks'] is None: - playlist['tracks'] = [ - { - **track, - 'track': { - **track['track'], - 'position': offset+i+1, - } - } - for i, track in enumerate(self._spotify_paginate_results( - f'/v1/playlists/{playlist["id"]}/tracks', - limit=limit, offset=offset - )) - ] - - self._cache_playlist_data(**playlist, limit=limit, offset=offset) - - return SpotifyPlaylistSchema().dump(playlist) - - @action - def add_to_playlist(self, playlist: str, resources: Union[str, Iterable[str]], position: Optional[int] = None): - """ - Add one or more items to a playlist. - - :param playlist: Playlist name, ID or URI. - :param resources: URI(s) of the resource(s) to be added. - :param position: At what (1-based) position the tracks should be inserted (default: append to the end). - """ - playlist = self._get_playlist(playlist) - response = self.spotify_user_call( - f'/v1/playlists/{playlist["id"]}/tracks', - method='post', - params={ - **({'position': position} if position is not None else {}), - }, - json={ - 'uris': [ - uri.strip() for uri in ( - resources.split(',') if isinstance(resources, str) else resources - ) - ] - } - ) - - snapshot_id = response.get('snapshot_id') - assert snapshot_id is not None, 'Could not save playlist' - - @action - def remove_from_playlist(self, playlist: str, resources: Union[str, Iterable[str]]): - """ - Remove one or more items from a playlist. - - :param playlist: Playlist name, ID or URI. - :param resources: URI(s) of the resource(s) to be removed. A maximum of 100 tracks can be provided at once. - """ - playlist = self._get_playlist(playlist) - response = self.spotify_user_call( - f'/v1/playlists/{playlist["id"]}/tracks', - method='delete', - json={ - 'tracks': [ - {'uri': uri.strip()} - for uri in ( - resources.split(',') if isinstance(resources, str) else resources - ) - ] - } - ) - - snapshot_id = response.get('snapshot_id') - assert snapshot_id is not None, 'Could not save playlist' - - @action - def playlist_move(self, playlist: str, from_pos: int, to_pos: int, range_length: int = 1, - resources: Optional[Union[str, Iterable[str]]] = None, **_): - """ - Move or replace elements in a playlist. - - :param playlist: Playlist name, ID or URI. - :param from_pos: Move tracks starting from this position (the first element has index 1). - :param to_pos: Move tracks to this position (1-based index). - :param range_length: Number of tracks to move (default: 1). - :param resources: If specified, then replace the items from `from_pos` to `from_pos+range_length` with the - specified set of Spotify URIs (it must be a collection with the same length as the range). - """ - playlist = self._get_playlist(playlist) - response = self.spotify_user_call( - f'/v1/playlists/{playlist["id"]}/tracks', - method='put', - json={ - 'range_start': int(from_pos) + 1, - 'range_length': int(range_length), - 'insert_before': int(to_pos) + 1, - **({'uris': [ - uri.strip() for uri in ( - resources.split(',') if isinstance(resources, str) else resources - ) - ]} if resources else {}) - } - ) - - snapshot_id = response.get('snapshot_id') - assert snapshot_id is not None, 'Could not save playlist' - - # noinspection PyShadowingBuiltins - @staticmethod - def _make_filter(query: Union[str, dict], **filter) -> str: - if filter: - query = { - **({'any': query} if isinstance(query, str) else {}), - **filter, - } - - if isinstance(query, str): - return query - - q = query['any'] if 'any' in query else '' - for attr in ['artist', 'track', 'album', 'year']: - if attr in query: - q += f' {attr}:{query[attr]}' - - return q.strip() - - # noinspection PyShadowingBuiltins - @action - def search(self, query: Optional[Union[str, dict]] = None, limit: int = 50, offset: int = 0, type: str = 'track', - **filter) -> Iterable[dict]: - """ - Search for tracks matching a certain criteria. - - :param query: Search filter. It can either be a free-text or a structured query. In the latter case the - following fields are supported: - - - ``any``: Search for anything that matches this text. - - ``uri``: Search the following Spotify ID/URI or list of IDs/URIs. - - ``artist``: Filter by artist. - - ``track``: Filter by track name. - - ``album``: Filter by album name. - - ``year``: Filter by year (dash-separated ranges are supported). - - :param limit: Maximum number of results (default: 50). - :param offset: Return results starting from this index (default: 0). - :param type: Type of results to be returned. Supported: ``album``, ``artist``, ``playlist``, ``track``, ``show`` - and ``episode`` (default: ``track``). - :param filter: Alternative key-value way of representing a structured query. - :return: - If ``type=track``: - .. schema:: spotify.SpotifyTrackSchema(many=True) - If ``type=album``: - .. schema:: spotify.SpotifyAlbumSchema(many=True) - If ``type=artist``: - .. schema:: spotify.SpotifyArtistSchema(many=True) - If ``type=playlist``: - .. schema:: spotify.SpotifyPlaylistSchema(many=True) - If ``type=episode``: - .. schema:: spotify.SpotifyEpisodeSchema(many=True) - If ``type=show``: - .. schema:: spotify.SpotifyShowSchema(many=True) - - """ - uri = { - **(query if isinstance(query, dict) else {}), - **filter, - }.get('uri', []) - - uris = uri.split(',') if isinstance(uri, str) else uri - params = { - 'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]), - } if uris else { - 'q': self._make_filter(query, **filter), - 'type': type, - } - - response = self._spotify_paginate_results( - f'/v1/{type + "s" if uris else "search"}', - limit=limit, - offset=offset, - type=type, - params=params, - ) - - if type == 'track': - return sorted( - SpotifyTrackSchema(many=True).dump(response), - key=lambda track: ( - track.get('artist'), - track.get('date'), - track.get('album'), - track.get('track'), - track.get('title'), - track.get('popularity'), + self._session = None + + def _oauth_open_saved_session(self): + try: + with open(self._credentials_file, 'r') as f: + data = json.load(f) + self._session.load_oauth_session( + data['token_type'], + data['access_token'], + data['refresh_token'] ) - ) + except Exception as e: + self.logger.warning('Could not load %s: %s', oauth_file, e) - schema_class = None - if type == 'playlist': - schema_class = SpotifyPlaylistSchema - if type == 'album': - schema_class = SpotifyAlbumSchema - if type == 'artist': - schema_class = SpotifyArtistSchema - if type == 'episode': - schema_class = SpotifyEpisodeSchema - if type == 'show': - schema_class = SpotifyShowSchema - - if schema_class: - return schema_class(many=True).dump(response) - - return response - - @action - def follow_playlist(self, playlist: str, public: bool = True): - """ - Follow a playlist. - - :param playlist: Playlist name, ID or URI. - :param public: If True (default) then the playlist will appear in the user's list of public playlists, otherwise - it won't. - """ - playlist = self._get_playlist(playlist) - self.spotify_user_call( - f'/v1/playlists/{playlist["id"]}/followers', - method='put', - json={ - 'public': public, + def _oauth_create_new_session(self): + self._session.login_oauth_simple(function=self.logger.warning) + if self._session.check_login(): + data = { + 'token_type': self._session.token_type, + 'session_id': self._session.session_id, + 'access_token': self._session.access_token, + 'refresh_token': self._session.refresh_token, } + + with open(oauth_file, 'w') as outfile: + json.dump(data, outfile) + + @property + def session(self): + from tidalapi import Config, Session + if self._session and self._session.check_login(): + return self._session + + # Attempt to reload the existing session from file + # TODO populate Config object + self._session = Session(config=Config()) + self._oauth_open_saved_session() + if not self._session.check_login(): + # Create a new session if we couldn't load an existing one + self._oauth_create_new_session() + + assert self._session.check_login(), 'Could not connect to TIDAL' + return self._session + + def _api_request(self, url, *args, method='get', **kwargs): + method = getattr(requests, method.lower()) + url = urljoin(base_url, url) + kwargs['headers'] = kwargs.get('headers', {}) + kwargs['params'] = kwargs.get('params', {}) + kwargs['params'].update({ + 'sessionId': self.session.session_id, + 'countryCode': self.session.country_code, + }) + + rs = None + kwargs['headers']['Authorization'] = '{type} {token}'.format( + type=self.session.token_type, + token=self.session.access_token ) - @action - def unfollow_playlist(self, playlist: str): - """ - Unfollow a playlist. + try: + rs = method(url, *args, **kwargs) + rs.raise_for_status() + return rs + except requests.HTTPError as e: + if rs: + self.logger.error(rs.text) + raise e - :param playlist: Playlist name, ID or URI. - """ - playlist = self._get_playlist(playlist) - self.spotify_user_call( - f'/v1/playlists/{playlist["id"]}/followers', - method='delete', - ) - - @staticmethod - def _uris_to_id(*uris: str) -> Iterable[str]: - return [ - uri.split(':')[-1] - for uri in uris - ] - - @action - def get_albums(self, limit: int = 50, offset: int = 0) -> List[dict]: - """ - Get the list of albums saved by the user. - - :param limit: Maximum number of results (default: 50). - :param offset: Return results starting from this index (default: 0). - :return: .. schema:: spotify.SpotifyAlbumSchema(many=True) - """ - return SpotifyAlbumSchema().dump( - self._spotify_paginate_results( - '/v1/me/albums', - limit=limit, - offset=offset, - ), many=True - ) - - @action - def save_albums(self, resources: Iterable[str]): - """ - Save a list of albums to the user's collection. - - :param resources: Spotify IDs or URIs of the albums to save. - """ - self.spotify_user_call( - '/v1/me/albums', - method='put', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def remove_albums(self, resources: Iterable[str]): - """ - Remove a list of albums from the user's collection. - - :param resources: Spotify IDs or URIs of the albums to remove. - """ - self.spotify_user_call( - '/v1/me/albums', - method='delete', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def get_tracks(self, limit: int = 100, offset: int = 0) -> List[dict]: - """ - Get the list of tracks saved by the user. - - :param limit: Maximum number of results (default: 100). - :param offset: Return results starting from this index (default: 0). - :return: .. schema:: spotify.SpotifyTrackSchema(many=True) - """ - return [ - SpotifyTrackSchema().dump(item['track']) - for item in self._spotify_paginate_results( - '/v1/me/tracks', - limit=limit, - offset=offset - ) - ] - - @action - def save_tracks(self, resources: Iterable[str]): - """ - Save a list of tracks to the user's collection. - - :param resources: Spotify IDs or URIs of the tracks to save. - """ - self.spotify_user_call( - '/v1/me/tracks', - method='put', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def remove_tracks(self, resources: Iterable[str]): - """ - Remove a list of tracks from the user's collection. - - :param resources: Spotify IDs or URIs of the tracks to remove. - """ - self.spotify_user_call( - '/v1/me/tracks', - method='delete', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def get_episodes(self, limit: int = 50, offset: int = 0) -> List[dict]: - """ - Get the list of episodes saved by the user. - - :param limit: Maximum number of results (default: 50). - :param offset: Return results starting from this index (default: 0). - :return: .. schema:: spotify.SpotifyEpisodeSchema(many=True) - """ - return SpotifyEpisodeSchema().dump( - self._spotify_paginate_results( - '/v1/me/episodes', - limit=limit, - offset=offset, - ), many=True - ) - - @action - def save_episodes(self, resources: Iterable[str]): - """ - Save a list of episodes to the user's collection. - - :param resources: Spotify IDs or URIs of the episodes to save. - """ - self.spotify_user_call( - '/v1/me/episodes', - method='put', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def remove_episodes(self, resources: Iterable[str]): - """ - Remove a list of episodes from the user's collection. - - :param resources: Spotify IDs or URIs of the episodes to remove. - """ - self.spotify_user_call( - '/v1/me/episodes', - method='delete', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def get_shows(self, limit: int = 50, offset: int = 0) -> List[dict]: - """ - Get the list of shows saved by the user. - - :param limit: Maximum number of results (default: 50). - :param offset: Return results starting from this index (default: 0). - :return: .. schema:: spotify.SpotifyShowSchema(many=True) - """ - return SpotifyShowSchema().dump( - self._spotify_paginate_results( - '/v1/me/shows', - limit=limit, - offset=offset, - ), many=True - ) - - @action - def save_shows(self, resources: Iterable[str]): - """ - Save a list of shows to the user's collection. - - :param resources: Spotify IDs or URIs of the shows to save. - """ - self.spotify_user_call( - '/v1/me/shows', - method='put', - json={'ids': self._uris_to_id(*resources)}, - ) - - @action - def remove_shows(self, resources: Iterable[str]): - """ - Remove a list of shows from the user's collection. - - :param resources: Spotify IDs or URIs of the shows to remove. - """ - self.spotify_user_call( - '/v1/me/shows', - method='delete', - json={'ids': self._uris_to_id(*resources)}, - ) diff --git a/setup.py b/setup.py index c338b09c..5db5d588 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ setup( 'zeroconf>=0.27.0', 'tz', 'python-dateutil', - 'cryptography', + # 'cryptography', 'pyjwt', 'marshmallow', 'frozendict',