Added create_playlist method to music.spotify plugin

This commit is contained in:
Fabio Manganiello 2022-09-16 20:41:33 +02:00
parent 45dc0fd3ca
commit 5ad7d966d7
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 227 additions and 112 deletions

View File

@ -6,9 +6,17 @@ from platypush.message.response import Response
from platypush.plugins import action from platypush.plugins import action
from platypush.plugins.media import PlayerState from platypush.plugins.media import PlayerState
from platypush.plugins.music import MusicPlugin from platypush.plugins.music import MusicPlugin
from platypush.schemas.spotify import SpotifyDeviceSchema, SpotifyStatusSchema, SpotifyTrackSchema, \ from platypush.schemas.spotify import (
SpotifyHistoryItemSchema, SpotifyPlaylistSchema, SpotifyAlbumSchema, SpotifyEpisodeSchema, SpotifyShowSchema, \ SpotifyDeviceSchema,
SpotifyArtistSchema SpotifyStatusSchema,
SpotifyTrackSchema,
SpotifyHistoryItemSchema,
SpotifyPlaylistSchema,
SpotifyAlbumSchema,
SpotifyEpisodeSchema,
SpotifyShowSchema,
SpotifyArtistSchema,
)
class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
@ -45,9 +53,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
be printed on the application logs/stdout. be printed on the application logs/stdout.
""" """
def __init__(self, client_id: Optional[str] = None, client_secret: Optional[str] = None, **kwargs): def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
**kwargs,
):
MusicPlugin.__init__(self, **kwargs) MusicPlugin.__init__(self, **kwargs)
SpotifyMixin.__init__(self, client_id=client_id, client_secret=client_secret, **kwargs) SpotifyMixin.__init__(
self, client_id=client_id, client_secret=client_secret, **kwargs
)
self._players_by_id = {} self._players_by_id = {}
self._players_by_name = {} self._players_by_name = {}
# Playlist ID -> snapshot ID and tracks cache # Playlist ID -> snapshot ID and tracks cache
@ -63,14 +78,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
return dev return dev
@staticmethod @staticmethod
def _parse_datetime(dt: Optional[Union[str, datetime, int, float]]) -> Optional[datetime]: def _parse_datetime(
dt: Optional[Union[str, datetime, int, float]]
) -> Optional[datetime]:
if isinstance(dt, str): if isinstance(dt, str):
try: try:
dt = float(dt) dt = float(dt)
except (ValueError, TypeError): except (ValueError, TypeError):
return datetime.fromisoformat(dt) return datetime.fromisoformat(dt)
if isinstance(dt, int) or isinstance(dt, float): if isinstance(dt, (int, float)):
return datetime.fromtimestamp(dt) return datetime.fromtimestamp(dt)
return dt return dt
@ -85,18 +102,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
devices = self.spotify_user_call('/v1/me/player/devices').get('devices', []) devices = self.spotify_user_call('/v1/me/player/devices').get('devices', [])
self._players_by_id = { self._players_by_id = {
**self._players_by_id, **self._players_by_id,
**{ **{dev['id']: dev for dev in devices},
dev['id']: dev
for dev in devices
}
} }
self._players_by_name = { self._players_by_name = {
**self._players_by_name, **self._players_by_name,
**{ **{dev['name']: dev for dev in devices},
dev['name']: dev
for dev in devices
}
} }
return SpotifyDeviceSchema().dump(devices, many=True) return SpotifyDeviceSchema().dump(devices, many=True)
@ -118,7 +129,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
params={ params={
'volume_percent': volume, 'volume_percent': volume,
**({'device_id': device} if device else {}), **({'device_id': device} if device else {}),
} },
) )
def _get_volume(self, device: Optional[str] = None) -> Optional[int]: def _get_volume(self, device: Optional[str] = None) -> Optional[int]:
@ -138,10 +149,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
if device: if device:
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call('/v1/me/player/volume', params={ self.spotify_user_call(
'volume_percent': min(100, (self._get_volume() or 0) + delta), '/v1/me/player/volume',
**({'device_id': device} if device else {}), params={
}) 'volume_percent': min(100, (self._get_volume() or 0) + delta),
**({'device_id': device} if device else {}),
},
)
@action @action
def voldown(self, delta: int = 5, device: Optional[str] = None): def voldown(self, delta: int = 5, device: Optional[str] = None):
@ -154,10 +168,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
if device: if device:
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call('/v1/me/player/volume', params={ self.spotify_user_call(
'volume_percent': max(0, (self._get_volume() or 0) - delta), '/v1/me/player/volume',
**({'device_id': device} if device else {}), params={
}) 'volume_percent': max(0, (self._get_volume() or 0) - delta),
**({'device_id': device} if device else {}),
},
)
@action @action
def play(self, resource: Optional[str] = None, device: Optional[str] = None): def play(self, resource: Optional[str] = None, device: Optional[str] = None):
@ -192,8 +209,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
status = self.status().output status = self.status().output
state = 'play' \ state = (
if status.get('device_id') != device or status.get('state') != PlayerState.PLAY.value else 'pause' 'play'
if status.get('device_id') != device
or status.get('state') != PlayerState.PLAY.value
else 'pause'
)
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/{state}', f'/v1/me/player/{state}',
@ -212,7 +233,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
status = self.status().output status = self.status().output
if status.get('state') == PlayerState.PLAY.value: if status.get('state') == PlayerState.PLAY.value:
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/pause', '/v1/me/player/pause',
method='put', method='put',
) )
@ -230,7 +251,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
status = self.status().output status = self.status().output
if status.get('state') != PlayerState.PLAY.value: if status.get('state') != PlayerState.PLAY.value:
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/play', '/v1/me/player/play',
method='put', method='put',
params={ params={
**({'device_id': device} if device else {}), **({'device_id': device} if device else {}),
@ -261,7 +282,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
""" """
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player', '/v1/me/player',
method='put', method='put',
json={ json={
'device_ids': [device], 'device_ids': [device],
@ -279,7 +300,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/next', '/v1/me/player/next',
method='post', method='post',
params={ params={
**({'device_id': device} if device else {}), **({'device_id': device} if device else {}),
@ -297,7 +318,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/previous', '/v1/me/player/previous',
method='post', method='post',
params={ params={
**({'device_id': device} if device else {}), **({'device_id': device} if device else {}),
@ -316,7 +337,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/seek', '/v1/me/player/seek',
method='put', method='put',
params={ params={
'position_ms': int(position * 1000), 'position_ms': int(position * 1000),
@ -338,13 +359,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
if value is None: if value is None:
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
status = self.status().output status = self.status().output
state = 'context' \ state = (
if status.get('device_id') != device or not status.get('repeat') else 'off' 'context'
if status.get('device_id') != device or not status.get('repeat')
else 'off'
)
else: else:
state = value is True state = value is True
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/repeat', '/v1/me/player/repeat',
method='put', method='put',
params={ params={
'state': 'context' if state else 'off', 'state': 'context' if state else 'off',
@ -366,12 +390,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
if value is None: if value is None:
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
status = self.status().output status = self.status().output
state = True if status.get('device_id') != device or not status.get('random') else False state = bool(status.get('device_id') != device or not status.get('random'))
else: else:
state = value is True state = value is True
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/shuffle', '/v1/me/player/shuffle',
method='put', method='put',
params={ params={
'state': state, 'state': state,
@ -380,8 +404,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
) )
@action @action
def history(self, limit: int = 20, before: Optional[Union[datetime, str, int]] = None, def history(
after: Optional[Union[datetime, str, int]] = None): self,
limit: int = 20,
before: Optional[Union[datetime, str, int]] = None,
after: Optional[Union[datetime, str, int]] = None,
):
""" """
Get a list of recently played track on the account. Get a list of recently played track on the account.
@ -396,21 +424,26 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
after = self._parse_datetime(after) after = self._parse_datetime(after)
assert not (before and after), 'before and after cannot both be set' assert not (before and after), 'before and after cannot both be set'
results = self._spotify_paginate_results('/v1/me/player/recently-played', results = self._spotify_paginate_results(
limit=limit, '/v1/me/player/recently-played',
params={ limit=limit,
'limit': min(limit, 50), params={
**({'before': before} if before else {}), 'limit': min(limit, 50),
**({'after': after} if after else {}), **({'before': before} if before else {}),
}) **({'after': after} if after else {}),
},
)
return SpotifyHistoryItemSchema().dump([ return SpotifyHistoryItemSchema().dump(
{ [
**item.pop('track'), {
**item, **item.pop('track'),
} **item,
for item in results }
], many=True) for item in results
],
many=True,
)
@action @action
def add(self, resource: str, device: Optional[str] = None, **kwargs): def add(self, resource: str, device: Optional[str] = None, **kwargs):
@ -424,7 +457,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
device = self._get_device(device)['id'] device = self._get_device(device)['id']
self.spotify_user_call( self.spotify_user_call(
f'/v1/me/player/queue', '/v1/me/player/queue',
method='post', method='post',
params={ params={
'uri': resource, 'uri': resource,
@ -472,7 +505,9 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
return SpotifyTrackSchema().dump(track) return SpotifyTrackSchema().dump(track)
@action @action
def get_playlists(self, limit: int = 1000, offset: int = 0, user: Optional[str] = None): def get_playlists(
self, limit: int = 1000, offset: int = 0, user: Optional[str] = None
):
""" """
Get the user's playlists. Get the user's playlists.
@ -483,7 +518,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
""" """
playlists = self._spotify_paginate_results( playlists = self._spotify_paginate_results(
f'/v1/{"users/" + user if user else "me"}/playlists', f'/v1/{"users/" + user if user else "me"}/playlists',
limit=limit, offset=offset limit=limit,
offset=offset,
) )
return SpotifyPlaylistSchema().dump(playlists, many=True) return SpotifyPlaylistSchema().dump(playlists, many=True)
@ -491,36 +527,45 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
def _get_playlist(self, playlist: str) -> dict: def _get_playlist(self, playlist: str) -> dict:
playlists = self.get_playlists().output playlists = self.get_playlists().output
playlists = [ playlists = [
pl for pl in playlists if ( pl
pl['id'] == playlist or for pl in playlists
pl['uri'] == playlist or if (pl['id'] == playlist or pl['uri'] == playlist or pl['name'] == playlist)
pl['name'] == playlist
)
] ]
assert playlists, f'No such playlist ID, URI or name: {playlist}' assert playlists, f'No such playlist ID, URI or name: {playlist}'
return playlists[0] return playlists[0]
def _get_playlist_tracks_from_cache(self, id: str, snapshot_id: str, limit: Optional[int] = None, def _get_playlist_tracks_from_cache(
offset: int = 0) -> Optional[Iterable]: self, id: str, snapshot_id: str, limit: Optional[int] = None, offset: int = 0
) -> Optional[Iterable]:
snapshot = self._playlist_snapshots.get(id) snapshot = self._playlist_snapshots.get(id)
if ( if (
not snapshot or not snapshot
snapshot['snapshot_id'] != snapshot_id or or snapshot['snapshot_id'] != snapshot_id
(limit is None and snapshot['limit'] is not None) or (limit is None and snapshot['limit'] is not None)
): ):
return return
if limit is not None and snapshot['limit'] is not None: if limit is not None and snapshot['limit'] is not None:
stored_range = (snapshot['limit'], snapshot['limit'] + snapshot['offset']) stored_range = (snapshot['limit'], snapshot['limit'] + snapshot['offset'])
requested_range = (limit, limit + offset) requested_range = (limit, limit + offset)
if requested_range[0] < stored_range[0] or requested_range[1] > stored_range[1]: if (
requested_range[0] < stored_range[0]
or requested_range[1] > stored_range[1]
):
return return
return snapshot['tracks'] return snapshot['tracks']
def _cache_playlist_data(self, id: str, snapshot_id: str, tracks: Iterable[dict], limit: Optional[int] = None, def _cache_playlist_data(
offset: int = 0, **_): self,
id: str,
snapshot_id: str,
tracks: Iterable[dict],
limit: Optional[int] = None,
offset: int = 0,
**_,
):
self._playlist_snapshots[id] = { self._playlist_snapshots[id] = {
'id': id, 'id': id,
'tracks': tracks, 'tracks': tracks,
@ -530,7 +575,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
} }
@action @action
def get_playlist(self, playlist: str, with_tracks: bool = True, limit: Optional[int] = None, offset: int = 0): def get_playlist(
self,
playlist: str,
with_tracks: bool = True,
limit: Optional[int] = None,
offset: int = 0,
):
""" """
Get a playlist content. Get a playlist content.
@ -544,8 +595,10 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
playlist = self._get_playlist(playlist) playlist = self._get_playlist(playlist)
if with_tracks: if with_tracks:
playlist['tracks'] = self._get_playlist_tracks_from_cache( playlist['tracks'] = self._get_playlist_tracks_from_cache(
playlist['id'], snapshot_id=playlist['snapshot_id'], playlist['id'],
limit=limit, offset=offset snapshot_id=playlist['snapshot_id'],
limit=limit,
offset=offset,
) )
if playlist['tracks'] is None: if playlist['tracks'] is None:
@ -554,13 +607,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
**track, **track,
'track': { 'track': {
**track['track'], **track['track'],
'position': offset+i+1, 'position': offset + i + 1,
} },
} }
for i, track in enumerate(self._spotify_paginate_results( for i, track in enumerate(
f'/v1/playlists/{playlist["id"]}/tracks', self._spotify_paginate_results(
limit=limit, offset=offset f'/v1/playlists/{playlist["id"]}/tracks',
)) limit=limit,
offset=offset,
)
)
] ]
self._cache_playlist_data(**playlist, limit=limit, offset=offset) self._cache_playlist_data(**playlist, limit=limit, offset=offset)
@ -568,7 +624,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
return SpotifyPlaylistSchema().dump(playlist) return SpotifyPlaylistSchema().dump(playlist)
@action @action
def add_to_playlist(self, playlist: str, resources: Union[str, Iterable[str]], position: Optional[int] = None): def add_to_playlist(
self,
playlist: str,
resources: Union[str, Iterable[str]],
position: Optional[int] = None,
):
""" """
Add one or more items to a playlist. Add one or more items to a playlist.
@ -585,11 +646,14 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
}, },
json={ json={
'uris': [ 'uris': [
uri.strip() for uri in ( uri.strip()
resources.split(',') if isinstance(resources, str) else resources for uri in (
resources.split(',')
if isinstance(resources, str)
else resources
) )
] ]
} },
) )
snapshot_id = response.get('snapshot_id') snapshot_id = response.get('snapshot_id')
@ -611,18 +675,27 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
'tracks': [ 'tracks': [
{'uri': uri.strip()} {'uri': uri.strip()}
for uri in ( for uri in (
resources.split(',') if isinstance(resources, str) else resources resources.split(',')
if isinstance(resources, str)
else resources
) )
] ]
} },
) )
snapshot_id = response.get('snapshot_id') snapshot_id = response.get('snapshot_id')
assert snapshot_id is not None, 'Could not save playlist' assert snapshot_id is not None, 'Could not save playlist'
@action @action
def playlist_move(self, playlist: str, from_pos: int, to_pos: int, range_length: int = 1, def playlist_move(
resources: Optional[Union[str, Iterable[str]]] = None, **_): self,
playlist: str,
from_pos: int,
to_pos: int,
range_length: int = 1,
resources: Optional[Union[str, Iterable[str]]] = None,
**_,
):
""" """
Move or replace elements in a playlist. Move or replace elements in a playlist.
@ -641,12 +714,21 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
'range_start': int(from_pos) + 1, 'range_start': int(from_pos) + 1,
'range_length': int(range_length), 'range_length': int(range_length),
'insert_before': int(to_pos) + 1, 'insert_before': int(to_pos) + 1,
**({'uris': [ **(
uri.strip() for uri in ( {
resources.split(',') if isinstance(resources, str) else resources 'uris': [
) uri.strip()
]} if resources else {}) for uri in (
} resources.split(',')
if isinstance(resources, str)
else resources
)
]
}
if resources
else {}
),
},
) )
snapshot_id = response.get('snapshot_id') snapshot_id = response.get('snapshot_id')
@ -673,8 +755,14 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
# noinspection PyShadowingBuiltins # noinspection PyShadowingBuiltins
@action @action
def search(self, query: Optional[Union[str, dict]] = None, limit: int = 50, offset: int = 0, type: str = 'track', def search(
**filter) -> Iterable[dict]: self,
query: Optional[Union[str, dict]] = None,
limit: int = 50,
offset: int = 0,
type: str = 'track',
**filter,
) -> Iterable[dict]:
""" """
Search for tracks matching a certain criteria. Search for tracks matching a certain criteria.
@ -714,12 +802,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
}.get('uri', []) }.get('uri', [])
uris = uri.split(',') if isinstance(uri, str) else uri uris = uri.split(',') if isinstance(uri, str) else uri
params = { params = (
'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]), {
} if uris else { 'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]),
'q': self._make_filter(query, **filter), }
'type': type, if uris
} else {
'q': self._make_filter(query, **filter),
'type': type,
}
)
response = self._spotify_paginate_results( response = self._spotify_paginate_results(
f'/v1/{type + "s" if uris else "search"}', f'/v1/{type + "s" if uris else "search"}',
@ -739,7 +831,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
track.get('track'), track.get('track'),
track.get('title'), track.get('title'),
track.get('popularity'), track.get('popularity'),
) ),
) )
schema_class = None schema_class = None
@ -759,6 +851,31 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
return response return response
@action
def create_playlist(
self, name: str, description: Optional[str] = None, public: bool = False
):
"""
Create a playlist.
:param name: Playlist name.
:param description: Optional playlist description.
:param public: Whether the new playlist should be public
(default: False).
:return: .. schema:: spotify.SpotifyPlaylistSchema
"""
ret = self.spotify_user_call(
'/v1/users/me/playlists',
method='post',
json={
'name': name,
'description': description,
'public': public,
},
)
return SpotifyPlaylistSchema().dump(ret)
@action @action
def follow_playlist(self, playlist: str, public: bool = True): def follow_playlist(self, playlist: str, public: bool = True):
""" """
@ -774,7 +891,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
method='put', method='put',
json={ json={
'public': public, 'public': public,
} },
) )
@action @action
@ -792,10 +909,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
@staticmethod @staticmethod
def _uris_to_id(*uris: str) -> Iterable[str]: def _uris_to_id(*uris: str) -> Iterable[str]:
return [ return [uri.split(':')[-1] for uri in uris]
uri.split(':')[-1]
for uri in uris
]
@action @action
def get_albums(self, limit: int = 50, offset: int = 0) -> List[dict]: def get_albums(self, limit: int = 50, offset: int = 0) -> List[dict]:
@ -811,7 +925,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
'/v1/me/albums', '/v1/me/albums',
limit=limit, limit=limit,
offset=offset, offset=offset,
), many=True ),
many=True,
) )
@action @action
@ -852,9 +967,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
return [ return [
SpotifyTrackSchema().dump(item['track']) SpotifyTrackSchema().dump(item['track'])
for item in self._spotify_paginate_results( for item in self._spotify_paginate_results(
'/v1/me/tracks', '/v1/me/tracks', limit=limit, offset=offset
limit=limit,
offset=offset
) )
] ]
@ -898,7 +1011,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
'/v1/me/episodes', '/v1/me/episodes',
limit=limit, limit=limit,
offset=offset, offset=offset,
), many=True ),
many=True,
) )
@action @action
@ -941,7 +1055,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin):
'/v1/me/shows', '/v1/me/shows',
limit=limit, limit=limit,
offset=offset, offset=offset,
), many=True ),
many=True,
) )
@action @action