2022-09-16 21:48:09 +02:00
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import pathlib
|
|
|
|
import requests
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
from urllib.parse import urljoin
|
|
|
|
from typing import Iterable, Optional, Union
|
|
|
|
|
|
|
|
from platypush.config import Config
|
|
|
|
from platypush.context import Variable, get_bus
|
|
|
|
from platypush.message.event.music.tidal import TidalPlaylistUpdatedEvent
|
|
|
|
from platypush.plugins import RunnablePlugin, action
|
|
|
|
from platypush.plugins.music.tidal.workers import get_items
|
|
|
|
from platypush.schemas.tidal import (
|
|
|
|
TidalAlbumSchema,
|
|
|
|
TidalPlaylistSchema,
|
|
|
|
TidalArtistSchema,
|
|
|
|
TidalSearchResultsSchema,
|
|
|
|
TidalTrackSchema,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MusicTidalPlugin(RunnablePlugin):
|
|
|
|
"""
|
|
|
|
Plugin to interact with the user's Tidal account and library.
|
|
|
|
|
|
|
|
Upon the first login, the application will prompt you with a link to
|
|
|
|
connect to your Tidal account. Once authorized, you should no longer be
|
|
|
|
required to explicitly login.
|
|
|
|
|
|
|
|
Triggers:
|
|
|
|
|
|
|
|
* :class:`platypush.message.event.music.TidalPlaylistUpdatedEvent`: when a user playlist
|
|
|
|
is updated.
|
|
|
|
|
|
|
|
Requires:
|
|
|
|
|
|
|
|
* **tidalapi** (``pip install tidalapi``)
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
_base_url = 'https://api.tidalhifi.com/v1/'
|
|
|
|
_default_credentials_file = os.path.join(
|
|
|
|
str(Config.get('workdir')), 'tidal', 'credentials.json'
|
|
|
|
)
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
quality: str = 'high',
|
|
|
|
credentials_file: str = _default_credentials_file,
|
|
|
|
**kwargs,
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
:param quality: Default audio quality. Default: ``high``.
|
|
|
|
Supported: [``loseless``, ``master``, ``high``, ``low``].
|
|
|
|
:param credentials_file: Path to the file where the OAuth session
|
2022-09-17 06:25:28 +02:00
|
|
|
parameters will be stored (default:
|
2022-09-16 21:48:09 +02:00
|
|
|
``<WORKDIR>/tidal/credentials.json``).
|
|
|
|
"""
|
|
|
|
from tidalapi import Quality
|
|
|
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
self._credentials_file = credentials_file
|
|
|
|
self._user_playlists = {}
|
|
|
|
|
|
|
|
try:
|
|
|
|
self._quality = getattr(Quality, quality.lower())
|
|
|
|
except AttributeError:
|
|
|
|
raise AssertionError(
|
|
|
|
f'Invalid quality: {quality}. Supported values: '
|
|
|
|
f'{[q.name for q in Quality]}'
|
|
|
|
)
|
|
|
|
|
|
|
|
self._session = None
|
|
|
|
|
|
|
|
def _oauth_open_saved_session(self):
|
|
|
|
if not self._session:
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
with open(self._credentials_file, 'r') as f:
|
|
|
|
data = json.load(f)
|
|
|
|
self._session.load_oauth_session(
|
|
|
|
data['token_type'], data['access_token'], data['refresh_token']
|
|
|
|
)
|
|
|
|
except Exception as e:
|
|
|
|
self.logger.warning('Could not load %s: %s', self._credentials_file, e)
|
|
|
|
|
|
|
|
def _oauth_create_new_session(self):
|
|
|
|
if not self._session:
|
|
|
|
return
|
|
|
|
|
|
|
|
self._session.login_oauth_simple(function=self.logger.warning) # type: ignore
|
|
|
|
if self._session.check_login():
|
|
|
|
data = {
|
|
|
|
'token_type': self._session.token_type,
|
|
|
|
'session_id': self._session.session_id,
|
|
|
|
'access_token': self._session.access_token,
|
|
|
|
'refresh_token': self._session.refresh_token,
|
|
|
|
}
|
|
|
|
|
|
|
|
pathlib.Path(os.path.dirname(self._credentials_file)).mkdir(
|
|
|
|
parents=True, exist_ok=True
|
|
|
|
)
|
|
|
|
|
|
|
|
with open(self._credentials_file, 'w') as outfile:
|
|
|
|
json.dump(data, outfile)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def session(self):
|
|
|
|
from tidalapi import Config, Session
|
|
|
|
|
|
|
|
if self._session and self._session.check_login():
|
|
|
|
return self._session
|
|
|
|
|
|
|
|
# Attempt to reload the existing session from file
|
|
|
|
self._session = Session(config=Config(quality=self._quality))
|
|
|
|
self._oauth_open_saved_session()
|
|
|
|
if not self._session.check_login():
|
|
|
|
# Create a new session if we couldn't load an existing one
|
|
|
|
self._oauth_create_new_session()
|
|
|
|
|
|
|
|
assert (
|
|
|
|
self._session.user and self._session.check_login()
|
|
|
|
), 'Could not connect to TIDAL'
|
|
|
|
|
|
|
|
return self._session
|
|
|
|
|
|
|
|
@property
|
|
|
|
def user(self):
|
|
|
|
user = self.session.user
|
|
|
|
assert user, 'Not logged in'
|
|
|
|
return user
|
|
|
|
|
|
|
|
def _api_request(self, url, *args, method='get', **kwargs):
|
|
|
|
method = getattr(requests, method.lower())
|
|
|
|
url = urljoin(self._base_url, url)
|
|
|
|
kwargs['headers'] = kwargs.get('headers', {})
|
|
|
|
kwargs['params'] = kwargs.get('params', {})
|
|
|
|
kwargs['params'].update(
|
|
|
|
{
|
|
|
|
'sessionId': self.session.session_id,
|
|
|
|
'countryCode': self.session.country_code,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
rs = None
|
|
|
|
kwargs['headers']['Authorization'] = '{type} {token}'.format(
|
|
|
|
type=self.session.token_type, token=self.session.access_token
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
rs = method(url, *args, **kwargs)
|
|
|
|
rs.raise_for_status()
|
|
|
|
return rs
|
|
|
|
except requests.HTTPError as e:
|
|
|
|
if rs:
|
|
|
|
self.logger.error(rs.text)
|
|
|
|
raise e
|
|
|
|
|
|
|
|
@action
|
|
|
|
def create_playlist(self, name: str, description: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Create a new playlist.
|
|
|
|
|
|
|
|
:param name: Playlist name.
|
|
|
|
:param description: Optional playlist description.
|
|
|
|
:return: .. schema:: tidal.TidalPlaylistSchema
|
|
|
|
"""
|
|
|
|
ret = self._api_request(
|
|
|
|
url=f'users/{self.user.id}/playlists',
|
|
|
|
method='post',
|
|
|
|
data={
|
|
|
|
'title': name,
|
|
|
|
'description': description,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
return TidalPlaylistSchema().dump(ret.json())
|
|
|
|
|
|
|
|
@action
|
|
|
|
def delete_playlist(self, playlist_id: str):
|
|
|
|
"""
|
|
|
|
Delete a playlist by ID.
|
|
|
|
|
|
|
|
:param playlist_id: ID of the playlist to delete.
|
|
|
|
"""
|
|
|
|
self._api_request(url=f'playlists/{playlist_id}', method='delete')
|
|
|
|
|
|
|
|
@action
|
|
|
|
def edit_playlist(self, playlist_id: str, title=None, description=None):
|
|
|
|
"""
|
|
|
|
Edit a playlist's metadata.
|
|
|
|
|
|
|
|
:param name: New name.
|
|
|
|
:param description: New description.
|
|
|
|
"""
|
|
|
|
pl = self.user.playlist(playlist_id)
|
|
|
|
pl.edit(title=title, description=description)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_playlists(self):
|
|
|
|
"""
|
|
|
|
Get the user's playlists (track lists are excluded).
|
|
|
|
|
|
|
|
:return: .. schema:: tidal.TidalPlaylistSchema(many=True)
|
|
|
|
"""
|
|
|
|
ret = self.user.playlists() + self.user.favorites.playlists()
|
|
|
|
|
|
|
|
return TidalPlaylistSchema().dump(ret, many=True)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_playlist(self, playlist_id: str):
|
|
|
|
"""
|
|
|
|
Get the details of a playlist (including tracks).
|
|
|
|
|
|
|
|
:param playlist_id: Playlist ID.
|
|
|
|
:return: .. schema:: tidal.TidalPlaylistSchema
|
|
|
|
"""
|
|
|
|
pl = self.session.playlist(playlist_id)
|
|
|
|
pl._tracks = get_items(pl.tracks)
|
|
|
|
return TidalPlaylistSchema().dump(pl)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_artist(self, artist_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Get the details of an artist.
|
|
|
|
|
|
|
|
:param artist_id: Artist ID.
|
|
|
|
:return: .. schema:: tidal.TidalArtistSchema
|
|
|
|
"""
|
|
|
|
ret = self.session.artist(artist_id)
|
|
|
|
ret.albums = get_items(ret.get_albums)
|
|
|
|
return TidalArtistSchema().dump(ret)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_album(self, album_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Get the details of an album.
|
|
|
|
|
|
|
|
:param artist_id: Album ID.
|
|
|
|
:return: .. schema:: tidal.TidalAlbumSchema
|
|
|
|
"""
|
|
|
|
ret = self.session.album(album_id)
|
|
|
|
return TidalAlbumSchema().dump(ret)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_track(self, track_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Get the details of an track.
|
|
|
|
|
|
|
|
:param artist_id: Track ID.
|
|
|
|
:return: .. schema:: tidal.TidalTrackSchema
|
|
|
|
"""
|
|
|
|
ret = self.session.album(track_id)
|
|
|
|
return TidalTrackSchema().dump(ret)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def search(
|
|
|
|
self,
|
|
|
|
query: str,
|
|
|
|
limit: int = 50,
|
|
|
|
offset: int = 0,
|
|
|
|
type: Optional[str] = None,
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Perform a search.
|
|
|
|
|
|
|
|
:param query: Query string.
|
|
|
|
:param limit: Maximum results that should be returned (default: 50).
|
|
|
|
:param offset: Search offset (default: 0).
|
|
|
|
:param type: Type of results that should be returned. Default: None
|
|
|
|
(return all the results that match the query). Supported:
|
|
|
|
``artist``, ``album``, ``track`` and ``playlist``.
|
|
|
|
:return: .. schema:: tidal.TidalSearchResultsSchema
|
|
|
|
"""
|
|
|
|
from tidalapi.artist import Artist
|
|
|
|
from tidalapi.album import Album
|
|
|
|
from tidalapi.media import Track
|
|
|
|
from tidalapi.playlist import Playlist
|
|
|
|
|
|
|
|
models = None
|
|
|
|
if type is not None:
|
|
|
|
if type == 'artist':
|
|
|
|
models = [Artist]
|
|
|
|
elif type == 'album':
|
|
|
|
models = [Album]
|
|
|
|
elif type == 'track':
|
|
|
|
models = [Track]
|
|
|
|
elif type == 'playlist':
|
|
|
|
models = [Playlist]
|
|
|
|
else:
|
|
|
|
raise AssertionError(f'Unsupported search type: {type}')
|
|
|
|
|
|
|
|
ret = self.session.search(query, models=models, limit=limit, offset=offset)
|
|
|
|
|
|
|
|
return TidalSearchResultsSchema().dump(ret)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_download_url(self, track_id: str) -> str:
|
|
|
|
"""
|
|
|
|
Get the direct download URL of a track.
|
|
|
|
|
|
|
|
:param artist_id: Track ID.
|
|
|
|
"""
|
|
|
|
return self.session.track(track_id).get_url()
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_to_playlist(self, playlist_id: str, track_ids: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Append one or more tracks to a playlist.
|
|
|
|
|
|
|
|
:param playlist_id: Target playlist ID.
|
|
|
|
:param track_ids: List of track IDs to append.
|
|
|
|
"""
|
|
|
|
return self._api_request(
|
|
|
|
url=f'playlists/{playlist_id}/items',
|
|
|
|
method='post',
|
|
|
|
headers={
|
|
|
|
'If-None-Match': None,
|
|
|
|
},
|
|
|
|
data={
|
|
|
|
'onArtifactNotFound': 'SKIP',
|
|
|
|
'onDupes': 'SKIP',
|
|
|
|
'trackIds': ','.join(map(str, track_ids)),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_track(self, track_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Add a track to the user's collection.
|
|
|
|
|
|
|
|
:param track_id: Track ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.add_track(track_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_album(self, album_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Add an album to the user's collection.
|
|
|
|
|
|
|
|
:param album_id: Album ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.add_album(album_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_artist(self, artist_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Add an artist to the user's collection.
|
|
|
|
|
|
|
|
:param artist_id: Artist ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.add_artist(artist_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_playlist(self, playlist_id: str):
|
|
|
|
"""
|
|
|
|
Add a playlist to the user's collection.
|
|
|
|
|
|
|
|
:param playlist_id: Playlist ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.add_playlist(playlist_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_track(self, track_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Remove a track from the user's collection.
|
|
|
|
|
|
|
|
:param track_id: Track ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.remove_track(track_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_album(self, album_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Remove an album from the user's collection.
|
|
|
|
|
|
|
|
:param album_id: Album ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.remove_album(album_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_artist(self, artist_id: Union[str, int]):
|
|
|
|
"""
|
|
|
|
Remove an artist from the user's collection.
|
|
|
|
|
|
|
|
:param artist_id: Artist ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.remove_artist(artist_id)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_playlist(self, playlist_id: str):
|
|
|
|
"""
|
|
|
|
Remove a playlist from the user's collection.
|
|
|
|
|
|
|
|
:param playlist_id: Playlist ID.
|
|
|
|
"""
|
|
|
|
self.user.favorites.remove_playlist(playlist_id)
|
|
|
|
|
|
|
|
def main(self):
|
|
|
|
while not self.should_stop():
|
|
|
|
playlists = self.session.user.playlists() # type: ignore
|
|
|
|
|
|
|
|
for pl in playlists:
|
|
|
|
last_updated_var = Variable(f'TIDAL_PLAYLIST_LAST_UPDATE[{pl.id}]')
|
|
|
|
prev_last_updated = last_updated_var.get()
|
|
|
|
if prev_last_updated:
|
|
|
|
prev_last_updated = datetime.fromisoformat(prev_last_updated)
|
|
|
|
if pl.last_updated > prev_last_updated:
|
|
|
|
get_bus().post(TidalPlaylistUpdatedEvent(playlist_id=pl.id))
|
|
|
|
|
|
|
|
if not prev_last_updated or pl.last_updated > prev_last_updated:
|
|
|
|
last_updated_var.set(pl.last_updated.isoformat())
|
|
|
|
|
|
|
|
self.wait_stop(self.poll_interval)
|