2022-09-13 17:01:34 +02:00
|
|
|
import os
|
|
|
|
|
2022-09-12 16:43:35 +02:00
|
|
|
from datetime import datetime
|
|
|
|
from typing import List, Optional, Union, Iterable
|
|
|
|
|
2022-09-13 17:01:34 +02:00
|
|
|
from platypush.config import Config
|
2022-09-12 16:43:35 +02:00
|
|
|
from platypush.message.response import Response
|
|
|
|
from platypush.plugins import action
|
|
|
|
from platypush.plugins.media import PlayerState
|
|
|
|
from platypush.plugins.music import MusicPlugin
|
|
|
|
from platypush.schemas.spotify import SpotifyDeviceSchema, SpotifyStatusSchema, SpotifyTrackSchema, \
|
|
|
|
SpotifyHistoryItemSchema, SpotifyPlaylistSchema, SpotifyAlbumSchema, SpotifyEpisodeSchema, SpotifyShowSchema, \
|
|
|
|
SpotifyArtistSchema
|
|
|
|
|
|
|
|
|
|
|
|
class MusicTidalPlugin(MusicPlugin):
|
|
|
|
"""
|
|
|
|
Plugin to interact with the user's Tidal account and library.
|
|
|
|
|
|
|
|
Requires:
|
|
|
|
|
|
|
|
* **tidalapi** (``pip install tidalapi``)
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
_base_url = 'https://api.tidalhifi.com/v1/'
|
2022-09-13 17:01:34 +02:00
|
|
|
_oauth_file = os.path.join(
|
|
|
|
str(Config.get('workdir')),
|
|
|
|
'tidal', 'credentials.json'
|
|
|
|
)
|
2022-09-12 16:43:35 +02:00
|
|
|
|
2022-09-12 20:56:05 +02:00
|
|
|
def __init__(self, quality: str = 'high', **kwargs):
|
|
|
|
"""
|
|
|
|
:param quality: Default audio quality. Default: ``high``.
|
|
|
|
Supported: [``loseless``, ``master``, ``high``, ``low``].
|
|
|
|
"""
|
|
|
|
from tidalapi import Quality
|
2022-09-12 16:43:35 +02:00
|
|
|
|
2022-09-12 20:56:05 +02:00
|
|
|
super().__init__(self, **kwargs)
|
|
|
|
try:
|
|
|
|
self._quality = getattr(Quality, quality.lower())
|
|
|
|
except AttributeError:
|
|
|
|
raise AssertionError(
|
|
|
|
f'Invalid quality: {quality}. Supported values: '
|
|
|
|
f'{[q.name for q in Quality]}'
|
|
|
|
)
|
2022-09-12 16:43:35 +02:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _parse_datetime(dt: Optional[Union[str, datetime, int, float]]) -> Optional[datetime]:
|
|
|
|
if isinstance(dt, str):
|
|
|
|
try:
|
|
|
|
dt = float(dt)
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
return datetime.fromisoformat(dt)
|
|
|
|
|
|
|
|
if isinstance(dt, int) or isinstance(dt, float):
|
|
|
|
return datetime.fromtimestamp(dt)
|
|
|
|
|
|
|
|
return dt
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_devices(self) -> List[dict]:
|
|
|
|
"""
|
|
|
|
Get the list of players associated to the Spotify account.
|
|
|
|
|
|
|
|
:return: .. schema:: spotify.SpotifyDeviceSchema(many=True)
|
|
|
|
"""
|
|
|
|
devices = self.spotify_user_call('/v1/me/player/devices').get('devices', [])
|
|
|
|
self._players_by_id = {
|
|
|
|
**self._players_by_id,
|
|
|
|
**{
|
|
|
|
dev['id']: dev
|
|
|
|
for dev in devices
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self._players_by_name = {
|
|
|
|
**self._players_by_name,
|
|
|
|
**{
|
|
|
|
dev['name']: dev
|
|
|
|
for dev in devices
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return SpotifyDeviceSchema().dump(devices, many=True)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def set_volume(self, volume: int, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Set the playback volume on a device.
|
|
|
|
|
|
|
|
:param volume: Target volume as a percentage between 0 and 100.
|
|
|
|
:param device: Device ID or name. If none is specified then the currently active device will be used.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/player/volume',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
'volume_percent': volume,
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
def _get_volume(self, device: Optional[str] = None) -> Optional[int]:
|
|
|
|
if device:
|
|
|
|
return self._get_device(device).get('volume')
|
|
|
|
|
|
|
|
return self.status.output.get('volume')
|
|
|
|
|
|
|
|
@action
|
|
|
|
def volup(self, delta: int = 5, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Set the volume up by a certain delta.
|
|
|
|
|
|
|
|
:param delta: Increase the volume by this percentage amount (between 0 and 100).
|
|
|
|
:param device: Device ID or name. If none is specified then the currently active device will be used.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call('/v1/me/player/volume', params={
|
|
|
|
'volume_percent': min(100, (self._get_volume() or 0) + delta),
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
})
|
|
|
|
|
|
|
|
@action
|
|
|
|
def voldown(self, delta: int = 5, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Set the volume down by a certain delta.
|
|
|
|
|
|
|
|
:param delta: Decrease the volume by this percentage amount (between 0 and 100).
|
|
|
|
:param device: Device ID or name. If none is specified then the currently active device will be used.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call('/v1/me/player/volume', params={
|
|
|
|
'volume_percent': max(0, (self._get_volume() or 0) - delta),
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
})
|
|
|
|
|
|
|
|
@action
|
|
|
|
def play(self, resource: Optional[str] = None, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Change the playback state of a device to ``PLAY`` or start playing a specific resource.
|
|
|
|
|
|
|
|
:param resource: Resource to play, in Spotify URI format (e.g. ``spotify:track:xxxxxxxxxxxxxxxxxxxxxx``).
|
|
|
|
If none is specified then the method will change the playback state to ``PLAY``.
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/player/play',
|
|
|
|
method='put',
|
|
|
|
json={'uris': [resource]} if resource else {},
|
|
|
|
params={
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def pause(self, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Toggle paused state.
|
|
|
|
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
|
status = self.status().output
|
|
|
|
state = 'play' \
|
|
|
|
if status.get('device_id') != device or status.get('state') != PlayerState.PLAY.value else 'pause'
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/{state}',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def pause_if_playing(self):
|
|
|
|
"""
|
|
|
|
Pause playback only if it's playing
|
|
|
|
"""
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
|
status = self.status().output
|
|
|
|
if status.get('state') == PlayerState.PLAY.value:
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/pause',
|
|
|
|
method='put',
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def play_if_paused(self, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Play only if it's paused (resume)
|
|
|
|
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
|
status = self.status().output
|
|
|
|
if status.get('state') != PlayerState.PLAY.value:
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/play',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def play_if_paused_or_stopped(self):
|
|
|
|
"""
|
|
|
|
Alias for :meth:`.play_if_paused`.
|
|
|
|
"""
|
|
|
|
return self.play_if_paused()
|
|
|
|
|
|
|
|
@action
|
|
|
|
def stop(self, **kwargs):
|
|
|
|
"""
|
|
|
|
This method is actually just an alias to :meth:`.stop`, since Spotify manages clearing playback sessions
|
|
|
|
automatically after a while for paused devices.
|
|
|
|
"""
|
|
|
|
return self.pause(**kwargs)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def start_or_transfer_playback(self, device: str):
|
|
|
|
"""
|
|
|
|
Start or transfer playback to the device specified.
|
|
|
|
|
|
|
|
:param device: Device ID or name.
|
|
|
|
"""
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player',
|
|
|
|
method='put',
|
|
|
|
json={
|
|
|
|
'device_ids': [device],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def next(self, device: Optional[str] = None, **kwargs):
|
|
|
|
"""
|
|
|
|
Skip to the next track.
|
|
|
|
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/next',
|
|
|
|
method='post',
|
|
|
|
params={
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def previous(self, device: Optional[str] = None, **kwargs):
|
|
|
|
"""
|
|
|
|
Skip to the next track.
|
|
|
|
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/previous',
|
|
|
|
method='post',
|
|
|
|
params={
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def seek(self, position: float, device: Optional[str] = None, **kwargs):
|
|
|
|
"""
|
|
|
|
Set the cursor to the specified position in the track.
|
|
|
|
|
|
|
|
:param position: Position in seconds.
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/seek',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
'position_ms': int(position * 1000),
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def repeat(self, value: Optional[bool] = None, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Set or toggle repeat mode.
|
|
|
|
|
|
|
|
:param value: If set, set the repeat state this value (true/false). Default: None (toggle current state).
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
if value is None:
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
|
status = self.status().output
|
|
|
|
state = 'context' \
|
|
|
|
if status.get('device_id') != device or not status.get('repeat') else 'off'
|
|
|
|
else:
|
|
|
|
state = value is True
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/repeat',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
'state': 'context' if state else 'off',
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def random(self, value: Optional[bool] = None, device: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Set or toggle random/shuffle mode.
|
|
|
|
|
|
|
|
:param value: If set, set the shuffle state this value (true/false). Default: None (toggle current state).
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
if value is None:
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
|
status = self.status().output
|
|
|
|
state = True if status.get('device_id') != device or not status.get('random') else False
|
|
|
|
else:
|
|
|
|
state = value is True
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/shuffle',
|
|
|
|
method='put',
|
|
|
|
params={
|
|
|
|
'state': state,
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def history(self, limit: int = 20, before: Optional[Union[datetime, str, int]] = None,
|
|
|
|
after: Optional[Union[datetime, str, int]] = None):
|
|
|
|
"""
|
|
|
|
Get a list of recently played track on the account.
|
|
|
|
|
|
|
|
:param limit: Maximum number of tracks to be retrieved (default: 20, max: 50).
|
|
|
|
:param before: Retrieve only the tracks played before this timestamp, specified as a UNIX timestamp, a datetime
|
|
|
|
object or an ISO datetime string. If ``before`` is set then ``after`` cannot be set.
|
|
|
|
:param after: Retrieve only the tracks played after this timestamp, specified as a UNIX timestamp, a datetime
|
|
|
|
object or an ISO datetime string. If ``after`` is set then ``before`` cannot be set.
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
before = self._parse_datetime(before)
|
|
|
|
after = self._parse_datetime(after)
|
|
|
|
assert not (before and after), 'before and after cannot both be set'
|
|
|
|
|
|
|
|
results = self._spotify_paginate_results('/v1/me/player/recently-played',
|
|
|
|
limit=limit,
|
|
|
|
params={
|
|
|
|
'limit': min(limit, 50),
|
|
|
|
**({'before': before} if before else {}),
|
|
|
|
**({'after': after} if after else {}),
|
|
|
|
})
|
|
|
|
|
|
|
|
return SpotifyHistoryItemSchema().dump([
|
|
|
|
{
|
|
|
|
**item.pop('track'),
|
|
|
|
**item,
|
|
|
|
}
|
|
|
|
for item in results
|
|
|
|
], many=True)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add(self, resource: str, device: Optional[str] = None, **kwargs):
|
|
|
|
"""
|
|
|
|
Add a Spotify resource (track, or episode) to the playing queue.
|
|
|
|
|
|
|
|
:param resource: Spotify resource URI.
|
|
|
|
:param device: Device ID or name. If none is specified then the action will target the currently active device.
|
|
|
|
"""
|
|
|
|
if device:
|
|
|
|
device = self._get_device(device)['id']
|
|
|
|
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/me/player/queue',
|
|
|
|
method='post',
|
|
|
|
params={
|
|
|
|
'uri': resource,
|
|
|
|
**({'device_id': device} if device else {}),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def clear(self, **kwargs):
|
|
|
|
pass
|
|
|
|
|
|
|
|
@action
|
|
|
|
def status(self, **kwargs) -> dict:
|
|
|
|
"""
|
|
|
|
Get the status of the currently active player.
|
|
|
|
|
|
|
|
:return: .. schema:: spotify.SpotifyStatusSchema
|
|
|
|
"""
|
|
|
|
status = self.spotify_user_call('/v1/me/player')
|
|
|
|
if not status:
|
|
|
|
return {
|
|
|
|
'state': PlayerState.STOP.value,
|
|
|
|
}
|
|
|
|
|
|
|
|
return SpotifyStatusSchema().dump(status)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def current_track(self, **kwargs) -> dict:
|
|
|
|
"""
|
|
|
|
Get the track currently playing.
|
|
|
|
|
|
|
|
:return: .. schema:: spotify.SpotifyTrackSchema
|
|
|
|
"""
|
|
|
|
status = self.spotify_user_call('/v1/me/player')
|
|
|
|
empty_response = Response(output={})
|
|
|
|
if not status:
|
|
|
|
# noinspection PyTypeChecker
|
|
|
|
return empty_response
|
|
|
|
|
|
|
|
track = status.get('item', {})
|
|
|
|
if not track:
|
|
|
|
# noinspection PyTypeChecker
|
|
|
|
return empty_response
|
|
|
|
|
|
|
|
return SpotifyTrackSchema().dump(track)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_playlists(self, limit: int = 1000, offset: int = 0, user: Optional[str] = None):
|
|
|
|
"""
|
|
|
|
Get the user's playlists.
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 1000).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:param user: Return the playlist owned by a specific user ID (default: currently logged in user).
|
|
|
|
:return: .. schema:: spotify.SpotifyPlaylistSchema
|
|
|
|
"""
|
|
|
|
playlists = self._spotify_paginate_results(
|
|
|
|
f'/v1/{"users/" + user if user else "me"}/playlists',
|
|
|
|
limit=limit, offset=offset
|
|
|
|
)
|
|
|
|
|
|
|
|
return SpotifyPlaylistSchema().dump(playlists, many=True)
|
|
|
|
|
|
|
|
def _get_playlist(self, playlist: str) -> dict:
|
|
|
|
playlists = self.get_playlists().output
|
|
|
|
playlists = [
|
|
|
|
pl for pl in playlists if (
|
|
|
|
pl['id'] == playlist or
|
|
|
|
pl['uri'] == playlist or
|
|
|
|
pl['name'] == playlist
|
|
|
|
)
|
|
|
|
]
|
|
|
|
|
|
|
|
assert playlists, f'No such playlist ID, URI or name: {playlist}'
|
|
|
|
return playlists[0]
|
|
|
|
|
|
|
|
def _get_playlist_tracks_from_cache(self, id: str, snapshot_id: str, limit: Optional[int] = None,
|
|
|
|
offset: int = 0) -> Optional[Iterable]:
|
|
|
|
snapshot = self._playlist_snapshots.get(id)
|
|
|
|
if (
|
|
|
|
not snapshot or
|
|
|
|
snapshot['snapshot_id'] != snapshot_id or
|
|
|
|
(limit is None and snapshot['limit'] is not None)
|
|
|
|
):
|
|
|
|
return
|
|
|
|
|
|
|
|
if limit is not None and snapshot['limit'] is not None:
|
|
|
|
stored_range = (snapshot['limit'], snapshot['limit'] + snapshot['offset'])
|
|
|
|
requested_range = (limit, limit + offset)
|
|
|
|
if requested_range[0] < stored_range[0] or requested_range[1] > stored_range[1]:
|
|
|
|
return
|
|
|
|
|
|
|
|
return snapshot['tracks']
|
|
|
|
|
|
|
|
def _cache_playlist_data(self, id: str, snapshot_id: str, tracks: Iterable[dict], limit: Optional[int] = None,
|
|
|
|
offset: int = 0, **_):
|
|
|
|
self._playlist_snapshots[id] = {
|
|
|
|
'id': id,
|
|
|
|
'tracks': tracks,
|
|
|
|
'snapshot_id': snapshot_id,
|
|
|
|
'limit': limit,
|
|
|
|
'offset': offset,
|
|
|
|
}
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_playlist(self, playlist: str, with_tracks: bool = True, limit: Optional[int] = None, offset: int = 0):
|
|
|
|
"""
|
|
|
|
Get a playlist content.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
:param with_tracks: Return also the playlist tracks (default: false, return only the metadata).
|
|
|
|
:param limit: If ``with_tracks`` is True, retrieve this maximum amount of tracks
|
|
|
|
(default: None, get all tracks).
|
|
|
|
:param offset: If ``with_tracks`` is True, retrieve tracks starting from this index (default: 0).
|
|
|
|
:return: .. schema:: spotify.SpotifyPlaylistSchema
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
if with_tracks:
|
|
|
|
playlist['tracks'] = self._get_playlist_tracks_from_cache(
|
|
|
|
playlist['id'], snapshot_id=playlist['snapshot_id'],
|
|
|
|
limit=limit, offset=offset
|
|
|
|
)
|
|
|
|
|
|
|
|
if playlist['tracks'] is None:
|
|
|
|
playlist['tracks'] = [
|
|
|
|
{
|
|
|
|
**track,
|
|
|
|
'track': {
|
|
|
|
**track['track'],
|
|
|
|
'position': offset+i+1,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i, track in enumerate(self._spotify_paginate_results(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/tracks',
|
|
|
|
limit=limit, offset=offset
|
|
|
|
))
|
|
|
|
]
|
|
|
|
|
|
|
|
self._cache_playlist_data(**playlist, limit=limit, offset=offset)
|
|
|
|
|
|
|
|
return SpotifyPlaylistSchema().dump(playlist)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def add_to_playlist(self, playlist: str, resources: Union[str, Iterable[str]], position: Optional[int] = None):
|
|
|
|
"""
|
|
|
|
Add one or more items to a playlist.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
:param resources: URI(s) of the resource(s) to be added.
|
|
|
|
:param position: At what (1-based) position the tracks should be inserted (default: append to the end).
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
response = self.spotify_user_call(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/tracks',
|
|
|
|
method='post',
|
|
|
|
params={
|
|
|
|
**({'position': position} if position is not None else {}),
|
|
|
|
},
|
|
|
|
json={
|
|
|
|
'uris': [
|
|
|
|
uri.strip() for uri in (
|
|
|
|
resources.split(',') if isinstance(resources, str) else resources
|
|
|
|
)
|
|
|
|
]
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
snapshot_id = response.get('snapshot_id')
|
|
|
|
assert snapshot_id is not None, 'Could not save playlist'
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_from_playlist(self, playlist: str, resources: Union[str, Iterable[str]]):
|
|
|
|
"""
|
|
|
|
Remove one or more items from a playlist.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
:param resources: URI(s) of the resource(s) to be removed. A maximum of 100 tracks can be provided at once.
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
response = self.spotify_user_call(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/tracks',
|
|
|
|
method='delete',
|
|
|
|
json={
|
|
|
|
'tracks': [
|
|
|
|
{'uri': uri.strip()}
|
|
|
|
for uri in (
|
|
|
|
resources.split(',') if isinstance(resources, str) else resources
|
|
|
|
)
|
|
|
|
]
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
snapshot_id = response.get('snapshot_id')
|
|
|
|
assert snapshot_id is not None, 'Could not save playlist'
|
|
|
|
|
|
|
|
@action
|
|
|
|
def playlist_move(self, playlist: str, from_pos: int, to_pos: int, range_length: int = 1,
|
|
|
|
resources: Optional[Union[str, Iterable[str]]] = None, **_):
|
|
|
|
"""
|
|
|
|
Move or replace elements in a playlist.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
:param from_pos: Move tracks starting from this position (the first element has index 1).
|
|
|
|
:param to_pos: Move tracks to this position (1-based index).
|
|
|
|
:param range_length: Number of tracks to move (default: 1).
|
|
|
|
:param resources: If specified, then replace the items from `from_pos` to `from_pos+range_length` with the
|
|
|
|
specified set of Spotify URIs (it must be a collection with the same length as the range).
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
response = self.spotify_user_call(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/tracks',
|
|
|
|
method='put',
|
|
|
|
json={
|
|
|
|
'range_start': int(from_pos) + 1,
|
|
|
|
'range_length': int(range_length),
|
|
|
|
'insert_before': int(to_pos) + 1,
|
|
|
|
**({'uris': [
|
|
|
|
uri.strip() for uri in (
|
|
|
|
resources.split(',') if isinstance(resources, str) else resources
|
|
|
|
)
|
|
|
|
]} if resources else {})
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
snapshot_id = response.get('snapshot_id')
|
|
|
|
assert snapshot_id is not None, 'Could not save playlist'
|
|
|
|
|
|
|
|
# noinspection PyShadowingBuiltins
|
|
|
|
@staticmethod
|
|
|
|
def _make_filter(query: Union[str, dict], **filter) -> str:
|
|
|
|
if filter:
|
|
|
|
query = {
|
|
|
|
**({'any': query} if isinstance(query, str) else {}),
|
|
|
|
**filter,
|
|
|
|
}
|
|
|
|
|
|
|
|
if isinstance(query, str):
|
|
|
|
return query
|
|
|
|
|
|
|
|
q = query['any'] if 'any' in query else ''
|
|
|
|
for attr in ['artist', 'track', 'album', 'year']:
|
|
|
|
if attr in query:
|
|
|
|
q += f' {attr}:{query[attr]}'
|
|
|
|
|
|
|
|
return q.strip()
|
|
|
|
|
|
|
|
# noinspection PyShadowingBuiltins
|
|
|
|
@action
|
|
|
|
def search(self, query: Optional[Union[str, dict]] = None, limit: int = 50, offset: int = 0, type: str = 'track',
|
|
|
|
**filter) -> Iterable[dict]:
|
|
|
|
"""
|
|
|
|
Search for tracks matching a certain criteria.
|
|
|
|
|
|
|
|
:param query: Search filter. It can either be a free-text or a structured query. In the latter case the
|
|
|
|
following fields are supported:
|
|
|
|
|
|
|
|
- ``any``: Search for anything that matches this text.
|
|
|
|
- ``uri``: Search the following Spotify ID/URI or list of IDs/URIs.
|
|
|
|
- ``artist``: Filter by artist.
|
|
|
|
- ``track``: Filter by track name.
|
|
|
|
- ``album``: Filter by album name.
|
|
|
|
- ``year``: Filter by year (dash-separated ranges are supported).
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 50).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:param type: Type of results to be returned. Supported: ``album``, ``artist``, ``playlist``, ``track``, ``show``
|
|
|
|
and ``episode`` (default: ``track``).
|
|
|
|
:param filter: Alternative key-value way of representing a structured query.
|
|
|
|
:return:
|
|
|
|
If ``type=track``:
|
|
|
|
.. schema:: spotify.SpotifyTrackSchema(many=True)
|
|
|
|
If ``type=album``:
|
|
|
|
.. schema:: spotify.SpotifyAlbumSchema(many=True)
|
|
|
|
If ``type=artist``:
|
|
|
|
.. schema:: spotify.SpotifyArtistSchema(many=True)
|
|
|
|
If ``type=playlist``:
|
|
|
|
.. schema:: spotify.SpotifyPlaylistSchema(many=True)
|
|
|
|
If ``type=episode``:
|
|
|
|
.. schema:: spotify.SpotifyEpisodeSchema(many=True)
|
|
|
|
If ``type=show``:
|
|
|
|
.. schema:: spotify.SpotifyShowSchema(many=True)
|
|
|
|
|
|
|
|
"""
|
|
|
|
uri = {
|
|
|
|
**(query if isinstance(query, dict) else {}),
|
|
|
|
**filter,
|
|
|
|
}.get('uri', [])
|
|
|
|
|
|
|
|
uris = uri.split(',') if isinstance(uri, str) else uri
|
|
|
|
params = {
|
|
|
|
'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]),
|
|
|
|
} if uris else {
|
|
|
|
'q': self._make_filter(query, **filter),
|
|
|
|
'type': type,
|
|
|
|
}
|
|
|
|
|
|
|
|
response = self._spotify_paginate_results(
|
|
|
|
f'/v1/{type + "s" if uris else "search"}',
|
|
|
|
limit=limit,
|
|
|
|
offset=offset,
|
|
|
|
type=type,
|
|
|
|
params=params,
|
|
|
|
)
|
|
|
|
|
|
|
|
if type == 'track':
|
|
|
|
return sorted(
|
|
|
|
SpotifyTrackSchema(many=True).dump(response),
|
|
|
|
key=lambda track: (
|
|
|
|
track.get('artist'),
|
|
|
|
track.get('date'),
|
|
|
|
track.get('album'),
|
|
|
|
track.get('track'),
|
|
|
|
track.get('title'),
|
|
|
|
track.get('popularity'),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
schema_class = None
|
|
|
|
if type == 'playlist':
|
|
|
|
schema_class = SpotifyPlaylistSchema
|
|
|
|
if type == 'album':
|
|
|
|
schema_class = SpotifyAlbumSchema
|
|
|
|
if type == 'artist':
|
|
|
|
schema_class = SpotifyArtistSchema
|
|
|
|
if type == 'episode':
|
|
|
|
schema_class = SpotifyEpisodeSchema
|
|
|
|
if type == 'show':
|
|
|
|
schema_class = SpotifyShowSchema
|
|
|
|
|
|
|
|
if schema_class:
|
|
|
|
return schema_class(many=True).dump(response)
|
|
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
@action
|
|
|
|
def follow_playlist(self, playlist: str, public: bool = True):
|
|
|
|
"""
|
|
|
|
Follow a playlist.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
:param public: If True (default) then the playlist will appear in the user's list of public playlists, otherwise
|
|
|
|
it won't.
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/followers',
|
|
|
|
method='put',
|
|
|
|
json={
|
|
|
|
'public': public,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def unfollow_playlist(self, playlist: str):
|
|
|
|
"""
|
|
|
|
Unfollow a playlist.
|
|
|
|
|
|
|
|
:param playlist: Playlist name, ID or URI.
|
|
|
|
"""
|
|
|
|
playlist = self._get_playlist(playlist)
|
|
|
|
self.spotify_user_call(
|
|
|
|
f'/v1/playlists/{playlist["id"]}/followers',
|
|
|
|
method='delete',
|
|
|
|
)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _uris_to_id(*uris: str) -> Iterable[str]:
|
|
|
|
return [
|
|
|
|
uri.split(':')[-1]
|
|
|
|
for uri in uris
|
|
|
|
]
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_albums(self, limit: int = 50, offset: int = 0) -> List[dict]:
|
|
|
|
"""
|
|
|
|
Get the list of albums saved by the user.
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 50).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:return: .. schema:: spotify.SpotifyAlbumSchema(many=True)
|
|
|
|
"""
|
|
|
|
return SpotifyAlbumSchema().dump(
|
|
|
|
self._spotify_paginate_results(
|
|
|
|
'/v1/me/albums',
|
|
|
|
limit=limit,
|
|
|
|
offset=offset,
|
|
|
|
), many=True
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def save_albums(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Save a list of albums to the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the albums to save.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/albums',
|
|
|
|
method='put',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_albums(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Remove a list of albums from the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the albums to remove.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/albums',
|
|
|
|
method='delete',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_tracks(self, limit: int = 100, offset: int = 0) -> List[dict]:
|
|
|
|
"""
|
|
|
|
Get the list of tracks saved by the user.
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 100).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:return: .. schema:: spotify.SpotifyTrackSchema(many=True)
|
|
|
|
"""
|
|
|
|
return [
|
|
|
|
SpotifyTrackSchema().dump(item['track'])
|
|
|
|
for item in self._spotify_paginate_results(
|
|
|
|
'/v1/me/tracks',
|
|
|
|
limit=limit,
|
|
|
|
offset=offset
|
|
|
|
)
|
|
|
|
]
|
|
|
|
|
|
|
|
@action
|
|
|
|
def save_tracks(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Save a list of tracks to the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the tracks to save.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/tracks',
|
|
|
|
method='put',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_tracks(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Remove a list of tracks from the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the tracks to remove.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/tracks',
|
|
|
|
method='delete',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_episodes(self, limit: int = 50, offset: int = 0) -> List[dict]:
|
|
|
|
"""
|
|
|
|
Get the list of episodes saved by the user.
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 50).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:return: .. schema:: spotify.SpotifyEpisodeSchema(many=True)
|
|
|
|
"""
|
|
|
|
return SpotifyEpisodeSchema().dump(
|
|
|
|
self._spotify_paginate_results(
|
|
|
|
'/v1/me/episodes',
|
|
|
|
limit=limit,
|
|
|
|
offset=offset,
|
|
|
|
), many=True
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def save_episodes(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Save a list of episodes to the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the episodes to save.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/episodes',
|
|
|
|
method='put',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_episodes(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Remove a list of episodes from the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the episodes to remove.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/episodes',
|
|
|
|
method='delete',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def get_shows(self, limit: int = 50, offset: int = 0) -> List[dict]:
|
|
|
|
"""
|
|
|
|
Get the list of shows saved by the user.
|
|
|
|
|
|
|
|
:param limit: Maximum number of results (default: 50).
|
|
|
|
:param offset: Return results starting from this index (default: 0).
|
|
|
|
:return: .. schema:: spotify.SpotifyShowSchema(many=True)
|
|
|
|
"""
|
|
|
|
return SpotifyShowSchema().dump(
|
|
|
|
self._spotify_paginate_results(
|
|
|
|
'/v1/me/shows',
|
|
|
|
limit=limit,
|
|
|
|
offset=offset,
|
|
|
|
), many=True
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def save_shows(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Save a list of shows to the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the shows to save.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/shows',
|
|
|
|
method='put',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|
|
|
|
|
|
|
|
@action
|
|
|
|
def remove_shows(self, resources: Iterable[str]):
|
|
|
|
"""
|
|
|
|
Remove a list of shows from the user's collection.
|
|
|
|
|
|
|
|
:param resources: Spotify IDs or URIs of the shows to remove.
|
|
|
|
"""
|
|
|
|
self.spotify_user_call(
|
|
|
|
'/v1/me/shows',
|
|
|
|
method='delete',
|
|
|
|
json={'ids': self._uris_to_id(*resources)},
|
|
|
|
)
|