import json import os import pathlib import requests from datetime import datetime from urllib.parse import urljoin from typing import Iterable, Optional, Union from platypush.config import Config from platypush.context import Variable, get_bus from platypush.message.event.music.tidal import TidalPlaylistUpdatedEvent from platypush.plugins import RunnablePlugin, action from platypush.plugins.music.tidal.workers import get_items from platypush.schemas.tidal import ( TidalAlbumSchema, TidalPlaylistSchema, TidalArtistSchema, TidalSearchResultsSchema, TidalTrackSchema, ) class MusicTidalPlugin(RunnablePlugin): """ Plugin to interact with the user's Tidal account and library. Upon the first login, the application will prompt you with a link to connect to your Tidal account. Once authorized, you should no longer be required to explicitly login. Triggers: * :class:`platypush.message.event.music.TidalPlaylistUpdatedEvent`: when a user playlist is updated. Requires: * **tidalapi** (``pip install 'tidalapi >= 0.7.0'``) """ _base_url = 'https://api.tidalhifi.com/v1/' _default_credentials_file = os.path.join( str(Config.get('workdir')), 'tidal', 'credentials.json' ) def __init__( self, quality: str = 'high', credentials_file: str = _default_credentials_file, **kwargs, ): """ :param quality: Default audio quality. Default: ``high``. Supported: [``loseless``, ``master``, ``high``, ``low``]. :param credentials_file: Path to the file where the OAuth session parameters will be stored (default: ``/tidal/credentials.json``). """ from tidalapi import Quality super().__init__(**kwargs) self._credentials_file = os.path.expanduser(credentials_file) self._user_playlists = {} try: self._quality = getattr(Quality, quality.lower()) except AttributeError: raise AssertionError( f'Invalid quality: {quality}. Supported values: ' f'{[q.name for q in Quality]}' ) self._session = None def _oauth_open_saved_session(self): if not self._session: return try: with open(self._credentials_file, 'r') as f: data = json.load(f) self._session.load_oauth_session( data['token_type'], data['access_token'], data['refresh_token'] ) except Exception as e: self.logger.warning('Could not load %s: %s', self._credentials_file, e) def _oauth_create_new_session(self): if not self._session: return self._session.login_oauth_simple(function=self.logger.warning) # type: ignore if self._session.check_login(): data = { 'token_type': self._session.token_type, 'session_id': self._session.session_id, 'access_token': self._session.access_token, 'refresh_token': self._session.refresh_token, } pathlib.Path(os.path.dirname(self._credentials_file)).mkdir( parents=True, exist_ok=True ) with open(self._credentials_file, 'w') as outfile: json.dump(data, outfile) @property def session(self): from tidalapi import Config, Session if self._session and self._session.check_login(): return self._session # Attempt to reload the existing session from file self._session = Session(config=Config(quality=self._quality)) self._oauth_open_saved_session() if not self._session.check_login(): # Create a new session if we couldn't load an existing one self._oauth_create_new_session() assert ( self._session.user and self._session.check_login() ), 'Could not connect to TIDAL' return self._session @property def user(self): user = self.session.user assert user, 'Not logged in' return user def _api_request(self, url, *args, method='get', **kwargs): method = getattr(requests, method.lower()) url = urljoin(self._base_url, url) kwargs['headers'] = kwargs.get('headers', {}) kwargs['params'] = kwargs.get('params', {}) kwargs['params'].update( { 'sessionId': self.session.session_id, 'countryCode': self.session.country_code, } ) rs = None kwargs['headers']['Authorization'] = '{type} {token}'.format( type=self.session.token_type, token=self.session.access_token ) try: rs = method(url, *args, **kwargs) rs.raise_for_status() return rs except requests.HTTPError as e: if rs: self.logger.error(rs.text) raise e @action def create_playlist(self, name: str, description: Optional[str] = None): """ Create a new playlist. :param name: Playlist name. :param description: Optional playlist description. :return: .. schema:: tidal.TidalPlaylistSchema """ ret = self._api_request( url=f'users/{self.user.id}/playlists', method='post', data={ 'title': name, 'description': description, }, ) return TidalPlaylistSchema().dump(ret.json()) @action def delete_playlist(self, playlist_id: str): """ Delete a playlist by ID. :param playlist_id: ID of the playlist to delete. """ pl = self.user.playlist(playlist_id) pl.delete() @action def edit_playlist(self, playlist_id: str, title=None, description=None): """ Edit a playlist's metadata. :param name: New name. :param description: New description. """ pl = self.user.playlist(playlist_id) pl.edit(title=title, description=description) @action def get_playlists(self): """ Get the user's playlists (track lists are excluded). :return: .. schema:: tidal.TidalPlaylistSchema(many=True) """ ret = self.user.playlists() + self.user.favorites.playlists() return TidalPlaylistSchema().dump(ret, many=True) @action def get_playlist(self, playlist_id: str): """ Get the details of a playlist (including tracks). :param playlist_id: Playlist ID. :return: .. schema:: tidal.TidalPlaylistSchema """ pl = self.session.playlist(playlist_id) pl._tracks = get_items(pl.tracks) return TidalPlaylistSchema().dump(pl) @action def get_artist(self, artist_id: Union[str, int]): """ Get the details of an artist. :param artist_id: Artist ID. :return: .. schema:: tidal.TidalArtistSchema """ ret = self.session.artist(artist_id) ret.albums = get_items(ret.get_albums) return TidalArtistSchema().dump(ret) @action def get_album(self, album_id: Union[str, int]): """ Get the details of an album. :param artist_id: Album ID. :return: .. schema:: tidal.TidalAlbumSchema """ ret = self.session.album(album_id) return TidalAlbumSchema().dump(ret) @action def get_track(self, track_id: Union[str, int]): """ Get the details of an track. :param artist_id: Track ID. :return: .. schema:: tidal.TidalTrackSchema """ ret = self.session.album(track_id) return TidalTrackSchema().dump(ret) @action def search( self, query: str, limit: int = 50, offset: int = 0, type: Optional[str] = None, ): """ Perform a search. :param query: Query string. :param limit: Maximum results that should be returned (default: 50). :param offset: Search offset (default: 0). :param type: Type of results that should be returned. Default: None (return all the results that match the query). Supported: ``artist``, ``album``, ``track`` and ``playlist``. :return: .. schema:: tidal.TidalSearchResultsSchema """ from tidalapi.artist import Artist from tidalapi.album import Album from tidalapi.media import Track from tidalapi.playlist import Playlist models = None if type is not None: if type == 'artist': models = [Artist] elif type == 'album': models = [Album] elif type == 'track': models = [Track] elif type == 'playlist': models = [Playlist] else: raise AssertionError(f'Unsupported search type: {type}') ret = self.session.search(query, models=models, limit=limit, offset=offset) return TidalSearchResultsSchema().dump(ret) @action def get_download_url(self, track_id: str) -> str: """ Get the direct download URL of a track. :param artist_id: Track ID. """ return self.session.track(track_id).get_url() @action def add_to_playlist(self, playlist_id: str, track_ids: Iterable[Union[str, int]]): """ Append one or more tracks to a playlist. :param playlist_id: Target playlist ID. :param track_ids: List of track IDs to append. """ pl = self.user.playlist(playlist_id) pl.add(track_ids) @action def remove_from_playlist( self, playlist_id: str, track_id: Optional[Union[str, int]] = None, index: Optional[int] = None, ): """ Remove a track from a playlist. Specify either the ``track_id`` or the ``index``. :param playlist_id: Target playlist ID. :param track_id: ID of the track to remove. :param index: Index of the track to remove. """ assert not ( track_id is None and index is None ), 'Please specify either track_id or index' pl = self.user.playlist(playlist_id) if index: pl.remove_by_index(index) if track_id: pl.remove_by_id(track_id) @action def add_track(self, track_id: Union[str, int]): """ Add a track to the user's collection. :param track_id: Track ID. """ self.user.favorites.add_track(track_id) @action def add_album(self, album_id: Union[str, int]): """ Add an album to the user's collection. :param album_id: Album ID. """ self.user.favorites.add_album(album_id) @action def add_artist(self, artist_id: Union[str, int]): """ Add an artist to the user's collection. :param artist_id: Artist ID. """ self.user.favorites.add_artist(artist_id) @action def add_playlist(self, playlist_id: str): """ Add a playlist to the user's collection. :param playlist_id: Playlist ID. """ self.user.favorites.add_playlist(playlist_id) @action def remove_track(self, track_id: Union[str, int]): """ Remove a track from the user's collection. :param track_id: Track ID. """ self.user.favorites.remove_track(track_id) @action def remove_album(self, album_id: Union[str, int]): """ Remove an album from the user's collection. :param album_id: Album ID. """ self.user.favorites.remove_album(album_id) @action def remove_artist(self, artist_id: Union[str, int]): """ Remove an artist from the user's collection. :param artist_id: Artist ID. """ self.user.favorites.remove_artist(artist_id) @action def remove_playlist(self, playlist_id: str): """ Remove a playlist from the user's collection. :param playlist_id: Playlist ID. """ self.user.favorites.remove_playlist(playlist_id) def main(self): while not self.should_stop(): playlists = self.session.user.playlists() # type: ignore for pl in playlists: last_updated_var = Variable(f'TIDAL_PLAYLIST_LAST_UPDATE[{pl.id}]') prev_last_updated = last_updated_var.get() if prev_last_updated: prev_last_updated = datetime.fromisoformat(prev_last_updated) if pl.last_updated > prev_last_updated: get_bus().post(TidalPlaylistUpdatedEvent(playlist_id=pl.id)) if not prev_last_updated or pl.last_updated > prev_last_updated: last_updated_var.set(pl.last_updated.isoformat()) self.wait_stop(self.poll_interval)