From c3fa3315f5948a782bfcea3fbd7f2d4ceeb9c3ca Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 30 Aug 2022 23:35:19 +0200 Subject: [PATCH 01/24] Implemented synchronization with webhook responses. When a client triggers a `WebhookEvent` by calling a configured webhook over `/hook/`, the server will now wait for the configured `@hook` function to complete and it will return the returned response back to the client. This makes webhooks much more powerful, as they can be used to proxy HTTP calls or other services, and in general return something to the client instead of just executing actions. --- platypush/backend/http/app/routes/hook.py | 36 ++++++++++--- platypush/event/hook.py | 63 +++++++++++++---------- platypush/message/event/http/hook.py | 48 +++++++++++++++-- 3 files changed, 107 insertions(+), 40 deletions(-) diff --git a/platypush/backend/http/app/routes/hook.py b/platypush/backend/http/app/routes/hook.py index 5243039c..bcf33c50 100644 --- a/platypush/backend/http/app/routes/hook.py +++ b/platypush/backend/http/app/routes/hook.py @@ -1,9 +1,11 @@ import json -from flask import Blueprint, abort, request, Response +from flask import Blueprint, abort, request +from flask.wrappers import Response from platypush.backend.http.app import template_folder from platypush.backend.http.app.utils import logger, send_message +from platypush.config import Config from platypush.message.event.http.hook import WebhookEvent @@ -15,9 +17,11 @@ __routes__ = [ ] -@hook.route('/hook/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS']) -def _hook(hook_name): - """ Endpoint for custom webhooks """ +@hook.route( + '/hook/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS'] +) +def hook_route(hook_name): + """Endpoint for custom webhooks""" event_args = { 'hook': hook_name, @@ -28,20 +32,36 @@ def _hook(hook_name): } if event_args['data']: - # noinspection PyBroadException try: event_args['data'] = json.loads(event_args['data']) except Exception as e: - logger().warning('Not a valid JSON string: {}: {}'.format(event_args['data'], str(e))) + logger().warning( + 'Not a valid JSON string: %s: %s', event_args['data'], str(e) + ) event = WebhookEvent(**event_args) + matching_hooks = [ + hook + for hook in Config.get_event_hooks().values() + if hook.condition.type == WebhookEvent + and hook.condition.args.get('hook') == hook_name + and request.method.lower() + == hook.condition.args.get('method', request.method).lower() + ] try: send_message(event) - return Response(json.dumps({'status': 'ok', **event_args}), mimetype='application/json') + + # If there are matching hooks, wait for their completion before returning + if matching_hooks: + return event.wait_response(timeout=60) + + return Response( + json.dumps({'status': 'ok', **event_args}), mimetype='application/json' + ) except Exception as e: logger().exception(e) - logger().error('Error while dispatching webhook event {}: {}'.format(event, str(e))) + logger().error('Error while dispatching webhook event %s: %s', event, str(e)) abort(500, str(e)) diff --git a/platypush/event/hook.py b/platypush/event/hook.py index 7dc3140a..35cb052d 100644 --- a/platypush/event/hook.py +++ b/platypush/event/hook.py @@ -15,10 +15,10 @@ logger = logging.getLogger('platypush') def parse(msg): - """ Builds a dict given another dictionary or - a JSON UTF-8 encoded string/bytearray """ + """Builds a dict given another dictionary or + a JSON UTF-8 encoded string/bytearray""" - if isinstance(msg, bytes) or isinstance(msg, bytearray): + if isinstance(msg, (bytes, bytearray)): msg = msg.decode('utf-8') if isinstance(msg, str): try: @@ -30,8 +30,8 @@ def parse(msg): return msg -class EventCondition(object): - """ Event hook condition class """ +class EventCondition: + """Event hook condition class""" def __init__(self, type=Event.__class__, priority=None, **kwargs): """ @@ -55,8 +55,8 @@ class EventCondition(object): @classmethod def build(cls, rule): - """ Builds a rule given either another EventRule, a dictionary or - a JSON UTF-8 encoded string/bytearray """ + """Builds a rule given either another EventRule, a dictionary or + a JSON UTF-8 encoded string/bytearray""" if isinstance(rule, cls): return rule @@ -64,8 +64,7 @@ class EventCondition(object): rule = parse(rule) assert isinstance(rule, dict), f'Not a valid rule: {rule}' - type = get_event_class_by_type( - rule.pop('type') if 'type' in rule else 'Event') + type = get_event_class_by_type(rule.pop('type') if 'type' in rule else 'Event') args = {} for (key, value) in rule.items(): @@ -75,8 +74,8 @@ class EventCondition(object): class EventAction(Request): - """ Event hook action class. It is a special type of runnable request - whose fields can be configured later depending on the event context """ + """Event hook action class. It is a special type of runnable request + whose fields can be configured later depending on the event context""" def __init__(self, target=None, action=None, **args): if target is None: @@ -99,16 +98,16 @@ class EventAction(Request): return super().build(action) -class EventHook(object): - """ Event hook class. It consists of one conditions and - one or multiple actions to be executed """ +class EventHook: + """Event hook class. It consists of one conditions and + one or multiple actions to be executed""" def __init__(self, name, priority=None, condition=None, actions=None): - """ Constructor. Takes a name, a EventCondition object and an event action - procedure as input. It may also have a priority attached - as a positive number. If multiple hooks match against an event, - only the ones that have either the maximum match score or the - maximum pre-configured priority will be run. """ + """Constructor. Takes a name, a EventCondition object and an event action + procedure as input. It may also have a priority attached + as a positive number. If multiple hooks match against an event, + only the ones that have either the maximum match score or the + maximum pre-configured priority will be run.""" self.name = name self.condition = EventCondition.build(condition or {}) @@ -118,8 +117,8 @@ class EventHook(object): @classmethod def build(cls, name, hook): - """ Builds a rule given either another EventRule, a dictionary or - a JSON UTF-8 encoded string/bytearray """ + """Builds a rule given either another EventRule, a dictionary or + a JSON UTF-8 encoded string/bytearray""" if isinstance(hook, cls): return hook @@ -146,14 +145,14 @@ class EventHook(object): return cls(name=name, condition=condition, actions=actions, priority=priority) def matches_event(self, event): - """ Returns an EventMatchResult object containing the information - about the match between the event and this hook """ + """Returns an EventMatchResult object containing the information + about the match between the event and this hook""" return event.matches_condition(self.condition) def run(self, event): - """ Checks the condition of the hook against a particular event and - runs the hook actions if the condition is met """ + """Checks the condition of the hook against a particular event and + runs the hook actions if the condition is met""" def _thread_func(result): set_thread_name('Event-' + self.name) @@ -163,7 +162,9 @@ class EventHook(object): if result.is_match: logger.info('Running hook {} triggered by an event'.format(self.name)) - threading.Thread(target=_thread_func, name='Event-' + self.name, args=(result,)).start() + threading.Thread( + target=_thread_func, name='Event-' + self.name, args=(result,) + ).start() def hook(event_type=Event, **condition): @@ -172,8 +173,14 @@ def hook(event_type=Event, **condition): f.condition = EventCondition(type=event_type, **condition) @wraps(f) - def wrapped(*args, **kwargs): - return exec_wrapper(f, *args, **kwargs) + def wrapped(event, *args, **kwargs): + from platypush.message.event.http.hook import WebhookEvent + + response = exec_wrapper(f, event, *args, **kwargs) + if isinstance(event, WebhookEvent): + event.send_response(response) + + return response return wrapped diff --git a/platypush/message/event/http/hook.py b/platypush/message/event/http/hook.py index b423e844..1c1690d9 100644 --- a/platypush/message/event/http/hook.py +++ b/platypush/message/event/http/hook.py @@ -1,11 +1,26 @@ +import json +import uuid + from platypush.message.event import Event +from platypush.utils import get_redis + class WebhookEvent(Event): """ Event triggered when a custom webhook is called. """ - def __init__(self, *argv, hook, method, data=None, args=None, headers=None, **kwargs): + def __init__( + self, + *argv, + hook, + method, + data=None, + args=None, + headers=None, + response=None, + **kwargs, + ): """ :param hook: Name of the invoked web hook, from http://host:port/hook/ :type hook: str @@ -21,10 +36,35 @@ class WebhookEvent(Event): :param headers: Request headers :type args: dict - """ - super().__init__(hook=hook, method=method, data=data, args=args or {}, - headers=headers or {}, *argv, **kwargs) + :param response: Response returned by the hook. + :type args: dict | list | str + """ + # This queue is used to synchronize with the hook and wait for its completion + kwargs['response_queue'] = kwargs.get( + 'response_queue', f'platypush/webhook/{str(uuid.uuid1())}' + ) + + super().__init__( + *argv, + hook=hook, + method=method, + data=data, + args=args or {}, + headers=headers or {}, + response=response, + **kwargs, + ) + + def send_response(self, response): + output = response.output + if isinstance(output, (dict, list)): + output = json.dumps(output) + + get_redis().rpush(self.args['response_queue'], output) + + def wait_response(self, timeout=None): + return get_redis().blpop(self.args['response_queue'], timeout=timeout)[1] # vim:sw=4:ts=4:et: From a675fe6a927fbe034fa81f0e74ce93e4c615f028 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 31 Aug 2022 00:49:08 +0200 Subject: [PATCH 02/24] Updated CHANGELOG --- CHANGELOG.md | 10 +++++++++- platypush/message/event/http/hook.py | 4 +++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6ec694c..2e65edc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,15 @@ # Changelog All notable changes to this project will be documented in this file. -Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2. +Given the high speed of development in the first phase, changes are being +reported only starting from v0.20.2. + +## [Unreleased] + +### Added + +- Added support for web hooks returning their hook method responses back to the + HTTP client. ## [0.23.4] - 2022-08-28 diff --git a/platypush/message/event/http/hook.py b/platypush/message/event/http/hook.py index 1c1690d9..728a13c5 100644 --- a/platypush/message/event/http/hook.py +++ b/platypush/message/event/http/hook.py @@ -64,7 +64,9 @@ class WebhookEvent(Event): get_redis().rpush(self.args['response_queue'], output) def wait_response(self, timeout=None): - return get_redis().blpop(self.args['response_queue'], timeout=timeout)[1] + rs = get_redis().blpop(self.args['response_queue'], timeout=timeout) + if rs and len(rs) > 1: + return rs[1] # vim:sw=4:ts=4:et: From db45d7ecbfe45da1ff5c02a60093deccf33af55e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 31 Aug 2022 01:27:53 +0200 Subject: [PATCH 03/24] FIX: More robust logic against section configurations that may not be maps --- platypush/config/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 481bd275..428ab917 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -215,7 +215,9 @@ class Config: ) else: section_config = file_config.get(section, {}) or {} - if not section_config.get('disabled'): + if not ( + hasattr(section_config, 'get') and section_config.get('disabled') + ): config[section] = section_config return config From 67413c02cd9d71ac3701037857c255dc6dd12bf6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 31 Aug 2022 01:55:21 +0200 Subject: [PATCH 04/24] Handle the case where the condition is a serialized dictionary --- platypush/backend/http/app/routes/hook.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/platypush/backend/http/app/routes/hook.py b/platypush/backend/http/app/routes/hook.py index bcf33c50..8d49530c 100644 --- a/platypush/backend/http/app/routes/hook.py +++ b/platypush/backend/http/app/routes/hook.py @@ -6,6 +6,7 @@ from flask.wrappers import Response from platypush.backend.http.app import template_folder from platypush.backend.http.app.utils import logger, send_message from platypush.config import Config +from platypush.event.hook import EventCondition from platypush.message.event.http.hook import WebhookEvent @@ -17,6 +18,16 @@ __routes__ = [ ] +def matches_condition(event: WebhookEvent, hook): + if isinstance(hook, dict): + condition = hook.get('condition', {}) + else: + condition = hook.condition + + condition = EventCondition.build(condition) + return event.matches_condition(condition) + + @hook.route( '/hook/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS'] ) @@ -43,10 +54,7 @@ def hook_route(hook_name): matching_hooks = [ hook for hook in Config.get_event_hooks().values() - if hook.condition.type == WebhookEvent - and hook.condition.args.get('hook') == hook_name - and request.method.lower() - == hook.condition.args.get('method', request.method).lower() + if matches_condition(event, hook) ] try: From 96b2ad148c3f81fb826055b4c97667cdf7b69ec2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 31 Aug 2022 02:19:21 +0200 Subject: [PATCH 05/24] A smarter way of building and matching the event condition --- platypush/backend/http/app/routes/hook.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/platypush/backend/http/app/routes/hook.py b/platypush/backend/http/app/routes/hook.py index 8d49530c..6996fcad 100644 --- a/platypush/backend/http/app/routes/hook.py +++ b/platypush/backend/http/app/routes/hook.py @@ -20,11 +20,13 @@ __routes__ = [ def matches_condition(event: WebhookEvent, hook): if isinstance(hook, dict): - condition = hook.get('condition', {}) + if_ = hook['if'].copy() + if_['type'] = '.'.join([event.__module__, event.__class__.__qualname__]) + + condition = EventCondition.build(if_) else: condition = hook.condition - condition = EventCondition.build(condition) return event.matches_condition(condition) From c5b12403d01ff81cee41753e41c1b73d7d69a839 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 1 Sep 2022 01:37:18 +0200 Subject: [PATCH 06/24] Implemented support for returning richer HTTP responses on webhooks. A `WebhookEvent` hook can now return a tuple in the format `(data, http_code, headers)` in order to customize the HTTP status code and the headers of a response. --- platypush/backend/http/app/routes/hook.py | 28 ++++++++++++++++++----- platypush/message/event/http/hook.py | 17 ++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/platypush/backend/http/app/routes/hook.py b/platypush/backend/http/app/routes/hook.py index 6996fcad..5d9b420c 100644 --- a/platypush/backend/http/app/routes/hook.py +++ b/platypush/backend/http/app/routes/hook.py @@ -1,7 +1,6 @@ import json -from flask import Blueprint, abort, request -from flask.wrappers import Response +from flask import Blueprint, abort, request, make_response from platypush.backend.http.app import template_folder from platypush.backend.http.app.utils import logger, send_message @@ -61,14 +60,31 @@ def hook_route(hook_name): try: send_message(event) + rs = make_response(json.dumps({'status': 'ok', **event_args})) + headers = {} + status_code = 200 # If there are matching hooks, wait for their completion before returning if matching_hooks: - return event.wait_response(timeout=60) + rs = event.wait_response(timeout=60) + try: + rs = json.loads(rs.decode()) # type: ignore + except Exception: + pass - return Response( - json.dumps({'status': 'ok', **event_args}), mimetype='application/json' - ) + if isinstance(rs, dict) and '___data___' in rs: + # data + http_code + custom_headers return format + headers = rs.get('___headers___', {}) + status_code = rs.get('___code___', status_code) + rs = rs['___data___'] + + rs = make_response(rs) + else: + headers = {'Content-Type': 'application/json'} + + rs.status_code = status_code + rs.headers.update(headers) + return rs except Exception as e: logger().exception(e) logger().error('Error while dispatching webhook event %s: %s', event, str(e)) diff --git a/platypush/message/event/http/hook.py b/platypush/message/event/http/hook.py index 728a13c5..e6f38610 100644 --- a/platypush/message/event/http/hook.py +++ b/platypush/message/event/http/hook.py @@ -58,6 +58,23 @@ class WebhookEvent(Event): def send_response(self, response): output = response.output + if isinstance(output, tuple): + # A 3-sized tuple where the second element is an int and the third + # is a dict represents an HTTP response in the format `(data, + # http_code headers)`. + if ( + len(output) == 3 + and isinstance(output[1], int) + and isinstance(output[2], dict) + ): + output = { + '___data___': output[0], + '___code___': output[1], + '___headers___': output[2], + } + else: + # Normalize tuples to lists before serialization + output = list(output) if isinstance(output, (dict, list)): output = json.dumps(output) From a286cf5000b4acaab2db6420fe5392a298437326 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 1 Sep 2022 11:13:16 +0200 Subject: [PATCH 07/24] Updated PopcornTime base URL --- platypush/plugins/torrent/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platypush/plugins/torrent/__init__.py b/platypush/plugins/torrent/__init__.py index 102c50cc..682f9f3d 100644 --- a/platypush/plugins/torrent/__init__.py +++ b/platypush/plugins/torrent/__init__.py @@ -37,7 +37,7 @@ class TorrentPlugin(Plugin): torrent_state = {} transfers = {} # noinspection HttpUrlsUsage - default_popcorn_base_url = 'http://popcorn-ru.tk' + default_popcorn_base_url = 'http://popcorn-time.ga' def __init__(self, download_dir=None, torrent_ports=None, imdb_key=None, popcorn_base_url=default_popcorn_base_url, **kwargs): From 6c6e68b5126ef517b839ed0d2ec161b6c0c7bb79 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 2 Sep 2022 00:21:40 +0200 Subject: [PATCH 08/24] Added support for OPML import and export in the RSS plugin. [closes #219] --- platypush/plugins/rss/__init__.py | 274 ++++++++++++++++++++++------ platypush/plugins/rss/manifest.yaml | 1 + setup.py | 2 +- 3 files changed, 225 insertions(+), 52 deletions(-) diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index 6a64ec39..10ef8350 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -1,8 +1,13 @@ import datetime +import os import queue +import re import threading import time -from typing import Optional, Collection + +from dateutil.tz import tzutc +from typing import Iterable, Optional, Collection, Set +from xml.etree import ElementTree import dateutil.parser import requests @@ -24,56 +29,67 @@ class RssPlugin(RunnablePlugin): Requires: * **feedparser** (``pip install feedparser``) + * **defusedxml** (``pip install defusedxml``) """ - user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' + \ - 'Chrome/62.0.3202.94 Safari/537.36' + user_agent = ( + 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' + + 'Chrome/62.0.3202.94 Safari/537.36' + ) def __init__( - self, subscriptions: Optional[Collection[str]] = None, poll_seconds: int = 300, - user_agent: str = user_agent, **kwargs + self, + subscriptions: Optional[Collection[str]] = None, + poll_seconds: int = 300, + user_agent: str = user_agent, + **kwargs, ): """ :param subscriptions: List of feeds to monitor for updates, as URLs. + OPML URLs/local files are also supported. :param poll_seconds: How often we should check for updates (default: 300 seconds). :param user_agent: Custom user agent to use for the requests. """ super().__init__(**kwargs) - self.subscriptions = subscriptions or [] self.poll_seconds = poll_seconds self.user_agent = user_agent - self._latest_timestamps = self._get_latest_timestamps() + self._feeds_metadata = {} self._feed_worker_queues = [queue.Queue()] * 5 self._feed_response_queue = queue.Queue() self._feed_workers = [] self._latest_entries = [] + self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) + + self._latest_timestamps = self._get_latest_timestamps() + @staticmethod def _get_feed_latest_timestamp_varname(url: str) -> str: return f'LATEST_FEED_TIMESTAMP[{url}]' @classmethod def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: - t = get_plugin('variable').get( - cls._get_feed_latest_timestamp_varname(url) - ).output.get(cls._get_feed_latest_timestamp_varname(url)) + t = ( + get_plugin('variable') + .get(cls._get_feed_latest_timestamp_varname(url)) + .output.get(cls._get_feed_latest_timestamp_varname(url)) + ) if t: return dateutil.parser.isoparse(t) def _get_latest_timestamps(self) -> dict: - return { - url: self._get_feed_latest_timestamp(url) - for url in self.subscriptions - } + return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} def _update_latest_timestamps(self) -> None: variable = get_plugin('variable') - variable.set(**{ - self._get_feed_latest_timestamp_varname(url): latest_timestamp - for url, latest_timestamp in self._latest_timestamps.items() - }) + variable.set( + **{ + self._get_feed_latest_timestamp_varname(url): latest_timestamp + for url, latest_timestamp in self._latest_timestamps.items() + } + ) @staticmethod def _parse_content(entry) -> Optional[str]: @@ -96,23 +112,30 @@ class RssPlugin(RunnablePlugin): """ import feedparser - feed = feedparser.parse(requests.get(url, headers={'User-Agent': self.user_agent}).text) + feed = feedparser.parse( + requests.get(url, headers={'User-Agent': self.user_agent}).text + ) return RssFeedEntrySchema().dump( - sorted([ - { - 'feed_url': url, - 'feed_title': getattr(feed.feed, 'title', None), - 'id': getattr(entry, 'id', None), - 'url': entry.link, - 'published': datetime.datetime.fromtimestamp(time.mktime(entry.published_parsed)), - 'title': entry.title, - 'summary': getattr(entry, 'summary', None), - 'content': self._parse_content(entry), - } - for entry in feed.entries - if getattr(entry, 'published_parsed', None) - ], key=lambda e: e['published']), - many=True + sorted( + [ + { + 'feed_url': url, + 'feed_title': getattr(feed.feed, 'title', None), + 'id': getattr(entry, 'id', None), + 'url': entry.link, + 'published': datetime.datetime.fromtimestamp( + time.mktime(entry.published_parsed) + ), + 'title': entry.title, + 'summary': getattr(entry, 'summary', None), + 'content': self._parse_content(entry), + } + for entry in feed.entries + if getattr(entry, 'published_parsed', None) + ], + key=lambda e: e['published'], + ), + many=True, ) @action @@ -123,7 +146,9 @@ class RssPlugin(RunnablePlugin): :param limit: Maximum number of entries to return (default: 20). :return: .. schema:: rss.RssFeedEntrySchema(many=True) """ - return sorted(self._latest_entries, key=lambda e: e['published'], reverse=True)[:limit] + return sorted(self._latest_entries, key=lambda e: e['published'], reverse=True)[ + :limit + ] def _feed_worker(self, q: queue.Queue): while not self.should_stop(): @@ -133,18 +158,157 @@ class RssPlugin(RunnablePlugin): continue try: - self._feed_response_queue.put({ - 'url': url, - 'content': self.parse_feed(url).output, - }) + self._feed_response_queue.put( + { + 'url': url, + 'content': self.parse_feed(url).output, + } + ) except Exception as e: - self._feed_response_queue.put({ - 'url': url, - 'error': e, - }) + self._feed_response_queue.put( + { + 'url': url, + 'error': e, + } + ) self._feed_response_queue.put(None) + def _parse_opml_lists(self, subs: Iterable[str]) -> Set[str]: + from defusedxml import ElementTree + + feeds = set() + subs = set(subs) + content_by_sub = {} + urls = {sub for sub in subs if re.search(r'^https?://', sub)} + files = {os.path.expanduser(sub) for sub in subs if sub not in urls} + + for url in urls: + try: + content_by_sub[url] = requests.get( + url, + headers={ + 'User-Agent': self.user_agent, + }, + ).text + except Exception as e: + self.logger.warning('Could not retrieve subscription %s: %s', url, e) + + for file in files: + try: + with open(file, 'r') as f: + content_by_sub[file] = f.read() + except Exception as e: + self.logger.warning('Could not open file %s: %s', file, e) + + for sub, content in content_by_sub.items(): + root = ElementTree.fromstring(content.strip()) + if root.tag != 'opml': + self.logger.warning('%s is not a valid OPML resource', sub) + continue + + feeds.update(self._parse_feeds_from_outlines(root.findall('body/outline'))) + + return feeds + + def _parse_feeds_from_outlines( + self, + outlines: Iterable[ElementTree.Element], + ) -> Set[str]: + feeds = set() + outlines = list(outlines) + + while outlines: + outline = outlines.pop(0) + if 'xmlUrl' in outline.attrib: + url = outline.attrib['xmlUrl'] + feeds.add(url) + self._feeds_metadata[url] = { + **self._feeds_metadata.get(url, {}), + 'title': outline.attrib.get('title'), + 'description': outline.attrib.get('text'), + 'url': outline.attrib.get('htmlUrl'), + } + + for i, child in enumerate(outline.iter()): + if i > 0: + outlines.append(child) + + return feeds + + def _parse_subscriptions(self, subs: Iterable[str]) -> Iterable[str]: + import feedparser + + self.logger.info('Parsing feed subscriptions') + feeds = set() + lists = set() + + for sub in subs: + try: + # Check if it's an OPML list of feeds or an individual feed + feed = feedparser.parse(sub) + if feed.feed.get('opml'): + lists.add(sub) + else: + channel = feed.get('channel', {}) + self._feeds_metadata[sub] = { + **self._feeds_metadata.get(sub, {}), + 'title': channel.get('title'), + 'description': channel.get('description'), + 'url': channel.get('link'), + } + + feeds.add(sub) + except Exception as e: + self.logger.warning('Could not parse %s: %s', sub, e) + + feeds.update(self._parse_opml_lists(lists)) + return feeds + + @staticmethod + def _datetime_to_string(dt: datetime.datetime) -> str: + return dt.replace(tzinfo=tzutc()).strftime('%a, %d %b %Y %H:%M:%S %Z') + + @action + def export_to_opml(self) -> str: + """ + Export the list of subscriptions into OPML format. + + :return: The list of subscriptions as a string in OPML format. + """ + root = ElementTree.Element('opml', {'version': '2.0'}) + + head = ElementTree.Element('head') + title = ElementTree.Element('title') + title.text = 'Platypush feed subscriptions' + created = ElementTree.Element('dateCreated') + created.text = self._datetime_to_string(datetime.datetime.utcnow()) + head.append(title) + head.append(created) + + body = ElementTree.Element('body') + feeds = ElementTree.Element('outline', {'text': 'Feeds'}) + + for sub in self.subscriptions: + metadata = self._feeds_metadata.get(sub, {}) + feed = ElementTree.Element( + 'outline', + { + 'xmlUrl': sub, + 'text': metadata.get('description', metadata.get('title', sub)), + **({'htmlUrl': metadata['url']} if metadata.get('url') else {}), + **({'title': metadata['title']} if metadata.get('title') else {}), + }, + ) + + feeds.append(feed) + + body.append(feeds) + + root.append(head) + root.append(body) + return ElementTree.tostring(root, encoding='utf-8', method='xml').decode() + def main(self): self._feed_workers = [ threading.Thread(target=self._feed_worker, args=(q,)) @@ -154,12 +318,16 @@ class RssPlugin(RunnablePlugin): for worker in self._feed_workers: worker.start() - self.logger.info(f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions') + self.logger.info( + f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions' + ) while not self.should_stop(): responses = {} for i, url in enumerate(self.subscriptions): - worker_queue = self._feed_worker_queues[i % len(self._feed_worker_queues)] + worker_queue = self._feed_worker_queues[ + i % len(self._feed_worker_queues) + ] worker_queue.put(url) time_start = time.time() @@ -168,12 +336,14 @@ class RssPlugin(RunnablePlugin): new_entries = [] while ( - not self.should_stop() and - len(responses) < len(self.subscriptions) and - time.time() - time_start <= timeout + not self.should_stop() + and len(responses) < len(self.subscriptions) + and time.time() - time_start <= timeout ): try: - response = self._feed_response_queue.get(block=True, timeout=max_time-time_start) + response = self._feed_response_queue.get( + block=True, timeout=max_time - time_start + ) except queue.Empty: self.logger.warning('RSS parse timeout') break @@ -189,7 +359,9 @@ class RssPlugin(RunnablePlugin): else: responses[url] = response['content'] - responses = {k: v for k, v in responses.items() if not isinstance(v, Exception)} + responses = { + k: v for k, v in responses.items() if not isinstance(v, Exception) + } for url, response in responses.items(): latest_timestamp = self._latest_timestamps.get(url) @@ -205,7 +377,7 @@ class RssPlugin(RunnablePlugin): self._update_latest_timestamps() self._latest_entries = new_entries - time.sleep(self.poll_seconds) + self.wait_stop(self.poll_seconds) def stop(self): super().stop() diff --git a/platypush/plugins/rss/manifest.yaml b/platypush/plugins/rss/manifest.yaml index 3a1cc7af..d596d3a2 100644 --- a/platypush/plugins/rss/manifest.yaml +++ b/platypush/plugins/rss/manifest.yaml @@ -4,5 +4,6 @@ manifest: install: pip: - feedparser + - defusedxml package: platypush.plugins.rss type: plugin diff --git a/setup.py b/setup.py index 79054a5f..c338b09c 100755 --- a/setup.py +++ b/setup.py @@ -86,7 +86,7 @@ setup( # Support for MQTT backends 'mqtt': ['paho-mqtt'], # Support for RSS feeds parser - 'rss': ['feedparser'], + 'rss': ['feedparser', 'defusedxml'], # Support for PDF generation 'pdf': ['weasyprint'], # Support for Philips Hue plugin From 1ea53a6f5023eb07a4c04d6d940cb9f1b6301554 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 4 Sep 2022 00:28:08 +0200 Subject: [PATCH 09/24] Support for query placeholders in `db.select` --- platypush/plugins/db/__init__.py | 143 +++++++++++++++++++++++-------- 1 file changed, 106 insertions(+), 37 deletions(-) diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index f4594f08..8cb0f89b 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -3,9 +3,11 @@ """ import time +from typing import Optional from sqlalchemy import create_engine, Table, MetaData from sqlalchemy.engine import Engine +from sqlalchemy.sql import text from platypush.plugins import Plugin, action @@ -23,10 +25,17 @@ class DbPlugin(Plugin): def __init__(self, engine=None, *args, **kwargs): """ - :param engine: Default SQLAlchemy connection engine string (e.g. ``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``) that will be used. You can override the default engine in the db actions. + :param engine: Default SQLAlchemy connection engine string (e.g. + ``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``) + that will be used. You can override the default engine in the db + actions. :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ super().__init__() @@ -41,11 +50,11 @@ class DbPlugin(Plugin): return create_engine(engine, *args, **kwargs) + assert self.engine return self.engine - # noinspection PyUnusedLocal @staticmethod - def _build_condition(table, column, value): + def _build_condition(table, column, value): # type: ignore if isinstance(value, str): value = "'{}'".format(value) elif not isinstance(value, int) and not isinstance(value, float): @@ -69,8 +78,12 @@ class DbPlugin(Plugin): :type statement: str :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ engine = self._get_engine(engine, *args, **kwargs) @@ -106,24 +119,42 @@ class DbPlugin(Plugin): return table, engine @action - def select(self, query=None, table=None, filter=None, engine=None, *args, **kwargs): + def select( + self, + query=None, + table=None, + filter=None, + engine=None, + data: Optional[dict] = None, + *args, + **kwargs + ): """ Returns rows (as a list of hashes) given a query. :param query: SQL to be executed :type query: str - :param filter: Query WHERE filter expressed as a dictionary. This approach is preferred over specifying raw SQL - in ``query`` as the latter approach may be prone to SQL injection, unless you need to build some complex - SQL logic. + :param filter: Query WHERE filter expressed as a dictionary. This + approach is preferred over specifying raw SQL + in ``query`` as the latter approach may be prone to SQL injection, + unless you need to build some complex SQL logic. :type filter: dict - :param table: If you specified a filter instead of a raw query, you'll have to specify the target table + :param table: If you specified a filter instead of a raw query, you'll + have to specify the target table :type table: str :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` - (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` - (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param data: If ``query`` is an SQL string, then you can use + SQLAlchemy's *placeholders* mechanism. You can specify placeholders + in the query for values that you want to be safely serialized, and + their values can be specified on the ``data`` attribute in a + ``name`` ➡️ ``value`` mapping format. + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` (see + https:///docs.sqlalchemy.org/en/latest/core/engines.html) :returns: List of hashes representing the result rows. Examples: @@ -136,7 +167,10 @@ class DbPlugin(Plugin): "action": "db.select", "args": { "engine": "sqlite:///:memory:", - "query": "SELECT id, name FROM table" + "query": "SELECT id, name FROM table WHERE name = :name", + "data": { + "name": "foobar" + } } } @@ -165,19 +199,24 @@ class DbPlugin(Plugin): engine = self._get_engine(engine, *args, **kwargs) + if isinstance(query, str): + query = text(query) + if table: table, engine = self._get_table(table, engine=engine, *args, **kwargs) query = table.select() if filter: - for (k,v) in filter.items(): + for (k, v) in filter.items(): query = query.where(self._build_condition(table, k, v)) if query is None: - raise RuntimeError('You need to specify either "query", or "table" and "filter"') + raise RuntimeError( + 'You need to specify either "query", or "table" and "filter"' + ) with engine.connect() as connection: - result = connection.execute(query) + result = connection.execute(query, **(data or {})) columns = result.keys() rows = [ {col: row[i] for i, col in enumerate(list(columns))} @@ -187,8 +226,16 @@ class DbPlugin(Plugin): return rows @action - def insert(self, table, records, engine=None, key_columns=None, - on_duplicate_update=False, *args, **kwargs): + def insert( + self, + table, + records, + engine=None, + key_columns=None, + on_duplicate_update=False, + *args, + **kwargs + ): """ Inserts records (as a list of hashes) into a table. @@ -198,12 +245,20 @@ class DbPlugin(Plugin): :type records: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param key_columns: Set it to specify the names of the key columns for ``table``. Set it if you want your statement to be executed with the ``on_duplicate_update`` flag. + :param key_columns: Set it to specify the names of the key columns for + ``table``. Set it if you want your statement to be executed with + the ``on_duplicate_update`` flag. :type key_columns: list - :param on_duplicate_update: If set, update the records in case of duplicate rows (default: False). If set, you'll need to specify ``key_columns`` as well. + :param on_duplicate_update: If set, update the records in case of + duplicate rows (default: False). If set, you'll need to specify + ``key_columns`` as well. :type on_duplicate_update: bool - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -238,15 +293,21 @@ class DbPlugin(Plugin): for record in records: table, engine = self._get_table(table, engine=engine, *args, **kwargs) + insert = table.insert().values(**record) try: engine.execute(insert) except Exception as e: if on_duplicate_update and key_columns: - self.update(table=table, records=records, - key_columns=key_columns, engine=engine, - *args, **kwargs) + self.update( + table=table, + records=records, + key_columns=key_columns, + engine=engine, + *args, + **kwargs + ) else: raise e @@ -263,8 +324,12 @@ class DbPlugin(Plugin): :type key_columns: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -297,12 +362,12 @@ class DbPlugin(Plugin): for record in records: table, engine = self._get_table(table, engine=engine, *args, **kwargs) - key = { k:v for (k,v) in record.items() if k in key_columns } - values = { k:v for (k,v) in record.items() if k not in key_columns } + key = {k: v for (k, v) in record.items() if k in key_columns} + values = {k: v for (k, v) in record.items() if k not in key_columns} update = table.update() - for (k,v) in key.items(): + for (k, v) in key.items(): update = update.where(self._build_condition(table, k, v)) update = update.values(**values) @@ -319,8 +384,12 @@ class DbPlugin(Plugin): :type records: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -347,7 +416,7 @@ class DbPlugin(Plugin): table, engine = self._get_table(table, engine=engine, *args, **kwargs) delete = table.delete() - for (k,v) in record.items(): + for (k, v) in record.items(): delete = delete.where(self._build_condition(table, k, v)) engine.execute(delete) From a90aa2cb2e69b5f722ee8cf390787e1443ab10d0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 4 Sep 2022 00:52:41 +0200 Subject: [PATCH 10/24] Make sure that a webhook function never returns a null response --- platypush/backend/http/app/routes/hook.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/platypush/backend/http/app/routes/hook.py b/platypush/backend/http/app/routes/hook.py index 5d9b420c..e5d3de40 100644 --- a/platypush/backend/http/app/routes/hook.py +++ b/platypush/backend/http/app/routes/hook.py @@ -60,7 +60,7 @@ def hook_route(hook_name): try: send_message(event) - rs = make_response(json.dumps({'status': 'ok', **event_args})) + rs = default_rs = make_response(json.dumps({'status': 'ok', **event_args})) headers = {} status_code = 200 @@ -78,6 +78,10 @@ def hook_route(hook_name): status_code = rs.get('___code___', status_code) rs = rs['___data___'] + if rs is None: + rs = default_rs + headers = {'Content-Type': 'application/json'} + rs = make_response(rs) else: headers = {'Content-Type': 'application/json'} From 0143dac2162ce3efb824774bf7b0f33ceaf6c357 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 4 Sep 2022 13:30:35 +0200 Subject: [PATCH 11/24] Improved support for bulk database statements - Wrapped insert/update/delete operations in transactions - Proper (and much more efficient) bulk logic - Better upsert logic - Return inserted/updated records if the engine supports it --- platypush/plugins/db/__init__.py | 145 +++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 37 deletions(-) diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 8cb0f89b..e419f157 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -7,7 +7,8 @@ from typing import Optional from sqlalchemy import create_engine, Table, MetaData from sqlalchemy.engine import Engine -from sqlalchemy.sql import text +from sqlalchemy.exc import CompileError +from sqlalchemy.sql import and_, or_, text from platypush.plugins import Plugin, action @@ -251,7 +252,9 @@ class DbPlugin(Plugin): :type key_columns: list :param on_duplicate_update: If set, update the records in case of duplicate rows (default: False). If set, you'll need to specify - ``key_columns`` as well. + ``key_columns`` as well. If ``key_columns`` is set, existing + records are found but ``on_duplicate_update`` is false, then + existing records will be ignored. :type on_duplicate_update: bool :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see @@ -260,6 +263,9 @@ class DbPlugin(Plugin): ``sqlalchemy.create_engine`` (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) + :return: The inserted records, if the underlying engine supports the + ``RETURNING`` statement, otherwise nothing. + Example: Request:: @@ -290,26 +296,98 @@ class DbPlugin(Plugin): key_columns = [] engine = self._get_engine(engine, *args, **kwargs) + table, engine = self._get_table(table, engine=engine, *args, **kwargs) + insert_records = records + update_records = [] + returned_records = [] + with engine.connect() as connection: + # Upsert case + if key_columns: + insert_records, update_records = self._get_new_and_existing_records( + connection, table, records, key_columns + ) + + with connection.begin(): + if insert_records: + insert = table.insert().values(insert_records) + ret = self._execute_try_returning(connection, insert) + if ret: + returned_records += ret + + if update_records and on_duplicate_update: + ret = self._update(connection, table, update_records, key_columns) + if ret: + returned_records = ret + returned_records + + if returned_records: + return returned_records + + @staticmethod + def _execute_try_returning(connection, stmt): + ret = None + stmt_with_ret = stmt.returning('*') + + try: + ret = connection.execute(stmt_with_ret) + except CompileError as e: + if str(e).startswith('RETURNING is not supported'): + connection.execute(stmt) + else: + raise e + + if ret: + return [ + {col.name: getattr(row, col.name, None) for col in stmt.table.c} + for row in ret + ] + + def _get_new_and_existing_records(self, connection, table, records, key_columns): + records_by_key = { + tuple(record.get(k) for k in key_columns): record for record in records + } + + query = table.select().where( + or_( + and_( + self._build_condition(table, k, record.get(k)) for k in key_columns + ) + for record in records + ) + ) + + existing_records = { + tuple(getattr(record, k, None) for k in key_columns): record + for record in connection.execute(query).all() + } + + update_records = [ + record for k, record in records_by_key.items() if k in existing_records + ] + + insert_records = [ + record for k, record in records_by_key.items() if k not in existing_records + ] + + return insert_records, update_records + + def _update(self, connection, table, records, key_columns): + updated_records = [] for record in records: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + key = {k: v for (k, v) in record.items() if k in key_columns} + values = {k: v for (k, v) in record.items() if k not in key_columns} + update = table.update() - insert = table.insert().values(**record) + for (k, v) in key.items(): + update = update.where(self._build_condition(table, k, v)) - try: - engine.execute(insert) - except Exception as e: - if on_duplicate_update and key_columns: - self.update( - table=table, - records=records, - key_columns=key_columns, - engine=engine, - *args, - **kwargs - ) - else: - raise e + update = update.values(**values) + ret = self._execute_try_returning(connection, update) + if ret: + updated_records += ret + + if updated_records: + return updated_records @action def update(self, table, records, key_columns, engine=None, *args, **kwargs): @@ -331,6 +409,9 @@ class DbPlugin(Plugin): ``sqlalchemy.create_engine`` (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) + :return: The inserted records, if the underlying engine supports the + ``RETURNING`` statement, otherwise nothing. + Example: Request:: @@ -357,21 +438,10 @@ class DbPlugin(Plugin): } } """ - engine = self._get_engine(engine, *args, **kwargs) - - for record in records: + with engine.connect() as connection: table, engine = self._get_table(table, engine=engine, *args, **kwargs) - key = {k: v for (k, v) in record.items() if k in key_columns} - values = {k: v for (k, v) in record.items() if k not in key_columns} - - update = table.update() - - for (k, v) in key.items(): - update = update.where(self._build_condition(table, k, v)) - - update = update.values(**values) - engine.execute(update) + return self._update(connection, table, records, key_columns) @action def delete(self, table, records, engine=None, *args, **kwargs): @@ -412,14 +482,15 @@ class DbPlugin(Plugin): engine = self._get_engine(engine, *args, **kwargs) - for record in records: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) - delete = table.delete() + with engine.connect() as connection, connection.begin(): + for record in records: + table, engine = self._get_table(table, engine=engine, *args, **kwargs) + delete = table.delete() - for (k, v) in record.items(): - delete = delete.where(self._build_condition(table, k, v)) + for (k, v) in record.items(): + delete = delete.where(self._build_condition(table, k, v)) - engine.execute(delete) + connection.execute(delete) # vim:sw=4:ts=4:et: From 4682fb4210392eb03616eefdf588550c1d7eabd0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 4 Sep 2022 16:02:37 +0200 Subject: [PATCH 12/24] Throw an assertion error when on_duplicate_update is specified on db.insert with no key_columns --- platypush/plugins/db/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index e419f157..1ff56daa 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -292,6 +292,11 @@ class DbPlugin(Plugin): } """ + if on_duplicate_update: + assert ( + key_columns + ), 'on_duplicate_update requires key_columns to be specified' + if key_columns is None: key_columns = [] From c77746e278dd2dfe07b1c5f37a5f993accfbc58c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 4 Sep 2022 16:06:58 +0200 Subject: [PATCH 13/24] If the output of a hook is null, make sure to normalize it an empty string before pushing it to Redis --- platypush/message/event/http/hook.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/platypush/message/event/http/hook.py b/platypush/message/event/http/hook.py index e6f38610..134bcfe0 100644 --- a/platypush/message/event/http/hook.py +++ b/platypush/message/event/http/hook.py @@ -78,6 +78,8 @@ class WebhookEvent(Event): if isinstance(output, (dict, list)): output = json.dumps(output) + if output is None: + output = '' get_redis().rpush(self.args['response_queue'], output) def wait_response(self, timeout=None): From 41acf4b253387ab4fd559853fb963307d3a940b6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 5 Sep 2022 03:05:22 +0200 Subject: [PATCH 14/24] Generate event ID as true random strings, not MD5 hashes of UUIDs --- platypush/message/event/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 038f53e2..7cb97bcb 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -1,10 +1,9 @@ import copy -import hashlib import json +import random import re import sys import time -import uuid from datetime import date @@ -79,9 +78,7 @@ class Event(Message): @staticmethod def _generate_id(): """Generate a unique event ID""" - return hashlib.md5( - str(uuid.uuid1()).encode() - ).hexdigest() # lgtm [py/weak-sensitive-data-hashing] + return ''.join(['{:02x}'.format(random.randint(0, 255)) for _ in range(16)]) def matches_condition(self, condition): """ From e1aa214badcee75bdd9ec5bcdbb52a9fac3d8076 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 16 Sep 2022 21:48:09 +0200 Subject: [PATCH 15/24] tidal-integration (#223) Reviewed-on: https://git.platypush.tech/platypush/platypush/pulls/223 --- CHANGELOG.md | 2 + platypush/message/event/music/tidal.py | 14 + platypush/plugins/music/spotify/__init__.py | 339 ++++++++++------ platypush/plugins/music/tidal/__init__.py | 417 ++++++++++++++++++++ platypush/plugins/music/tidal/manifest.yaml | 9 + platypush/plugins/music/tidal/workers.py | 56 +++ platypush/schemas/tidal.py | 206 ++++++++++ setup.py | 2 +- 8 files changed, 932 insertions(+), 113 deletions(-) create mode 100644 platypush/message/event/music/tidal.py create mode 100644 platypush/plugins/music/tidal/__init__.py create mode 100644 platypush/plugins/music/tidal/manifest.yaml create mode 100644 platypush/plugins/music/tidal/workers.py create mode 100644 platypush/schemas/tidal.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e65edc1..692dcb0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ reported only starting from v0.20.2. - Added support for web hooks returning their hook method responses back to the HTTP client. +- Added [Tidal integration](https://git.platypush.tech/platypush/platypush/pulls/223) + ## [0.23.4] - 2022-08-28 ### Added diff --git a/platypush/message/event/music/tidal.py b/platypush/message/event/music/tidal.py new file mode 100644 index 00000000..504c8258 --- /dev/null +++ b/platypush/message/event/music/tidal.py @@ -0,0 +1,14 @@ +from platypush.message.event import Event + + +class TidalEvent(Event): + """Base class for Tidal events""" + + +class TidalPlaylistUpdatedEvent(TidalEvent): + """ + Event fired when a Tidal playlist is updated. + """ + + def __init__(self, playlist_id: str, *args, **kwargs): + super().__init__(*args, playlist_id=playlist_id, **kwargs) diff --git a/platypush/plugins/music/spotify/__init__.py b/platypush/plugins/music/spotify/__init__.py index e416c7c7..eb0eb18b 100644 --- a/platypush/plugins/music/spotify/__init__.py +++ b/platypush/plugins/music/spotify/__init__.py @@ -6,9 +6,17 @@ from platypush.message.response import Response from platypush.plugins import action from platypush.plugins.media import PlayerState from platypush.plugins.music import MusicPlugin -from platypush.schemas.spotify import SpotifyDeviceSchema, SpotifyStatusSchema, SpotifyTrackSchema, \ - SpotifyHistoryItemSchema, SpotifyPlaylistSchema, SpotifyAlbumSchema, SpotifyEpisodeSchema, SpotifyShowSchema, \ - SpotifyArtistSchema +from platypush.schemas.spotify import ( + SpotifyDeviceSchema, + SpotifyStatusSchema, + SpotifyTrackSchema, + SpotifyHistoryItemSchema, + SpotifyPlaylistSchema, + SpotifyAlbumSchema, + SpotifyEpisodeSchema, + SpotifyShowSchema, + SpotifyArtistSchema, +) class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): @@ -45,9 +53,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): be printed on the application logs/stdout. """ - def __init__(self, client_id: Optional[str] = None, client_secret: Optional[str] = None, **kwargs): + def __init__( + self, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + **kwargs, + ): MusicPlugin.__init__(self, **kwargs) - SpotifyMixin.__init__(self, client_id=client_id, client_secret=client_secret, **kwargs) + SpotifyMixin.__init__( + self, client_id=client_id, client_secret=client_secret, **kwargs + ) self._players_by_id = {} self._players_by_name = {} # Playlist ID -> snapshot ID and tracks cache @@ -63,14 +78,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): return dev @staticmethod - def _parse_datetime(dt: Optional[Union[str, datetime, int, float]]) -> Optional[datetime]: + def _parse_datetime( + dt: Optional[Union[str, datetime, int, float]] + ) -> Optional[datetime]: if isinstance(dt, str): try: dt = float(dt) except (ValueError, TypeError): return datetime.fromisoformat(dt) - if isinstance(dt, int) or isinstance(dt, float): + if isinstance(dt, (int, float)): return datetime.fromtimestamp(dt) return dt @@ -85,18 +102,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): devices = self.spotify_user_call('/v1/me/player/devices').get('devices', []) self._players_by_id = { **self._players_by_id, - **{ - dev['id']: dev - for dev in devices - } + **{dev['id']: dev for dev in devices}, } self._players_by_name = { **self._players_by_name, - **{ - dev['name']: dev - for dev in devices - } + **{dev['name']: dev for dev in devices}, } return SpotifyDeviceSchema().dump(devices, many=True) @@ -118,7 +129,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): params={ 'volume_percent': volume, **({'device_id': device} if device else {}), - } + }, ) def _get_volume(self, device: Optional[str] = None) -> Optional[int]: @@ -138,10 +149,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): if device: device = self._get_device(device)['id'] - self.spotify_user_call('/v1/me/player/volume', params={ - 'volume_percent': min(100, (self._get_volume() or 0) + delta), - **({'device_id': device} if device else {}), - }) + self.spotify_user_call( + '/v1/me/player/volume', + params={ + 'volume_percent': min(100, (self._get_volume() or 0) + delta), + **({'device_id': device} if device else {}), + }, + ) @action def voldown(self, delta: int = 5, device: Optional[str] = None): @@ -154,10 +168,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): if device: device = self._get_device(device)['id'] - self.spotify_user_call('/v1/me/player/volume', params={ - 'volume_percent': max(0, (self._get_volume() or 0) - delta), - **({'device_id': device} if device else {}), - }) + self.spotify_user_call( + '/v1/me/player/volume', + params={ + 'volume_percent': max(0, (self._get_volume() or 0) - delta), + **({'device_id': device} if device else {}), + }, + ) @action def play(self, resource: Optional[str] = None, device: Optional[str] = None): @@ -192,8 +209,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): # noinspection PyUnresolvedReferences status = self.status().output - state = 'play' \ - if status.get('device_id') != device or status.get('state') != PlayerState.PLAY.value else 'pause' + state = ( + 'play' + if status.get('device_id') != device + or status.get('state') != PlayerState.PLAY.value + else 'pause' + ) self.spotify_user_call( f'/v1/me/player/{state}', @@ -212,7 +233,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): status = self.status().output if status.get('state') == PlayerState.PLAY.value: self.spotify_user_call( - f'/v1/me/player/pause', + '/v1/me/player/pause', method='put', ) @@ -230,7 +251,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): status = self.status().output if status.get('state') != PlayerState.PLAY.value: self.spotify_user_call( - f'/v1/me/player/play', + '/v1/me/player/play', method='put', params={ **({'device_id': device} if device else {}), @@ -261,7 +282,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): """ device = self._get_device(device)['id'] self.spotify_user_call( - f'/v1/me/player', + '/v1/me/player', method='put', json={ 'device_ids': [device], @@ -279,7 +300,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): device = self._get_device(device)['id'] self.spotify_user_call( - f'/v1/me/player/next', + '/v1/me/player/next', method='post', params={ **({'device_id': device} if device else {}), @@ -297,7 +318,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): device = self._get_device(device)['id'] self.spotify_user_call( - f'/v1/me/player/previous', + '/v1/me/player/previous', method='post', params={ **({'device_id': device} if device else {}), @@ -316,7 +337,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): device = self._get_device(device)['id'] self.spotify_user_call( - f'/v1/me/player/seek', + '/v1/me/player/seek', method='put', params={ 'position_ms': int(position * 1000), @@ -338,13 +359,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): if value is None: # noinspection PyUnresolvedReferences status = self.status().output - state = 'context' \ - if status.get('device_id') != device or not status.get('repeat') else 'off' + state = ( + 'context' + if status.get('device_id') != device or not status.get('repeat') + else 'off' + ) else: state = value is True self.spotify_user_call( - f'/v1/me/player/repeat', + '/v1/me/player/repeat', method='put', params={ 'state': 'context' if state else 'off', @@ -366,12 +390,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): if value is None: # noinspection PyUnresolvedReferences status = self.status().output - state = True if status.get('device_id') != device or not status.get('random') else False + state = bool(status.get('device_id') != device or not status.get('random')) else: state = value is True self.spotify_user_call( - f'/v1/me/player/shuffle', + '/v1/me/player/shuffle', method='put', params={ 'state': state, @@ -380,8 +404,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): ) @action - def history(self, limit: int = 20, before: Optional[Union[datetime, str, int]] = None, - after: Optional[Union[datetime, str, int]] = None): + def history( + self, + limit: int = 20, + before: Optional[Union[datetime, str, int]] = None, + after: Optional[Union[datetime, str, int]] = None, + ): """ Get a list of recently played track on the account. @@ -396,21 +424,26 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): after = self._parse_datetime(after) assert not (before and after), 'before and after cannot both be set' - results = self._spotify_paginate_results('/v1/me/player/recently-played', - limit=limit, - params={ - 'limit': min(limit, 50), - **({'before': before} if before else {}), - **({'after': after} if after else {}), - }) + results = self._spotify_paginate_results( + '/v1/me/player/recently-played', + limit=limit, + params={ + 'limit': min(limit, 50), + **({'before': before} if before else {}), + **({'after': after} if after else {}), + }, + ) - return SpotifyHistoryItemSchema().dump([ - { - **item.pop('track'), - **item, - } - for item in results - ], many=True) + return SpotifyHistoryItemSchema().dump( + [ + { + **item.pop('track'), + **item, + } + for item in results + ], + many=True, + ) @action def add(self, resource: str, device: Optional[str] = None, **kwargs): @@ -424,7 +457,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): device = self._get_device(device)['id'] self.spotify_user_call( - f'/v1/me/player/queue', + '/v1/me/player/queue', method='post', params={ 'uri': resource, @@ -472,7 +505,9 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): return SpotifyTrackSchema().dump(track) @action - def get_playlists(self, limit: int = 1000, offset: int = 0, user: Optional[str] = None): + def get_playlists( + self, limit: int = 1000, offset: int = 0, user: Optional[str] = None + ): """ Get the user's playlists. @@ -483,7 +518,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): """ playlists = self._spotify_paginate_results( f'/v1/{"users/" + user if user else "me"}/playlists', - limit=limit, offset=offset + limit=limit, + offset=offset, ) return SpotifyPlaylistSchema().dump(playlists, many=True) @@ -491,36 +527,45 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): def _get_playlist(self, playlist: str) -> dict: playlists = self.get_playlists().output playlists = [ - pl for pl in playlists if ( - pl['id'] == playlist or - pl['uri'] == playlist or - pl['name'] == playlist - ) + pl + for pl in playlists + if (pl['id'] == playlist or pl['uri'] == playlist or pl['name'] == playlist) ] assert playlists, f'No such playlist ID, URI or name: {playlist}' return playlists[0] - def _get_playlist_tracks_from_cache(self, id: str, snapshot_id: str, limit: Optional[int] = None, - offset: int = 0) -> Optional[Iterable]: + def _get_playlist_tracks_from_cache( + self, id: str, snapshot_id: str, limit: Optional[int] = None, offset: int = 0 + ) -> Optional[Iterable]: snapshot = self._playlist_snapshots.get(id) if ( - not snapshot or - snapshot['snapshot_id'] != snapshot_id or - (limit is None and snapshot['limit'] is not None) + not snapshot + or snapshot['snapshot_id'] != snapshot_id + or (limit is None and snapshot['limit'] is not None) ): return if limit is not None and snapshot['limit'] is not None: stored_range = (snapshot['limit'], snapshot['limit'] + snapshot['offset']) requested_range = (limit, limit + offset) - if requested_range[0] < stored_range[0] or requested_range[1] > stored_range[1]: + if ( + requested_range[0] < stored_range[0] + or requested_range[1] > stored_range[1] + ): return return snapshot['tracks'] - def _cache_playlist_data(self, id: str, snapshot_id: str, tracks: Iterable[dict], limit: Optional[int] = None, - offset: int = 0, **_): + def _cache_playlist_data( + self, + id: str, + snapshot_id: str, + tracks: Iterable[dict], + limit: Optional[int] = None, + offset: int = 0, + **_, + ): self._playlist_snapshots[id] = { 'id': id, 'tracks': tracks, @@ -530,7 +575,13 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): } @action - def get_playlist(self, playlist: str, with_tracks: bool = True, limit: Optional[int] = None, offset: int = 0): + def get_playlist( + self, + playlist: str, + with_tracks: bool = True, + limit: Optional[int] = None, + offset: int = 0, + ): """ Get a playlist content. @@ -544,8 +595,10 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): playlist = self._get_playlist(playlist) if with_tracks: playlist['tracks'] = self._get_playlist_tracks_from_cache( - playlist['id'], snapshot_id=playlist['snapshot_id'], - limit=limit, offset=offset + playlist['id'], + snapshot_id=playlist['snapshot_id'], + limit=limit, + offset=offset, ) if playlist['tracks'] is None: @@ -554,13 +607,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): **track, 'track': { **track['track'], - 'position': offset+i+1, - } + 'position': offset + i + 1, + }, } - for i, track in enumerate(self._spotify_paginate_results( - f'/v1/playlists/{playlist["id"]}/tracks', - limit=limit, offset=offset - )) + for i, track in enumerate( + self._spotify_paginate_results( + f'/v1/playlists/{playlist["id"]}/tracks', + limit=limit, + offset=offset, + ) + ) ] self._cache_playlist_data(**playlist, limit=limit, offset=offset) @@ -568,7 +624,12 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): return SpotifyPlaylistSchema().dump(playlist) @action - def add_to_playlist(self, playlist: str, resources: Union[str, Iterable[str]], position: Optional[int] = None): + def add_to_playlist( + self, + playlist: str, + resources: Union[str, Iterable[str]], + position: Optional[int] = None, + ): """ Add one or more items to a playlist. @@ -585,11 +646,14 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): }, json={ 'uris': [ - uri.strip() for uri in ( - resources.split(',') if isinstance(resources, str) else resources + uri.strip() + for uri in ( + resources.split(',') + if isinstance(resources, str) + else resources ) ] - } + }, ) snapshot_id = response.get('snapshot_id') @@ -611,18 +675,27 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): 'tracks': [ {'uri': uri.strip()} for uri in ( - resources.split(',') if isinstance(resources, str) else resources + resources.split(',') + if isinstance(resources, str) + else resources ) ] - } + }, ) snapshot_id = response.get('snapshot_id') assert snapshot_id is not None, 'Could not save playlist' @action - def playlist_move(self, playlist: str, from_pos: int, to_pos: int, range_length: int = 1, - resources: Optional[Union[str, Iterable[str]]] = None, **_): + def playlist_move( + self, + playlist: str, + from_pos: int, + to_pos: int, + range_length: int = 1, + resources: Optional[Union[str, Iterable[str]]] = None, + **_, + ): """ Move or replace elements in a playlist. @@ -641,12 +714,21 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): 'range_start': int(from_pos) + 1, 'range_length': int(range_length), 'insert_before': int(to_pos) + 1, - **({'uris': [ - uri.strip() for uri in ( - resources.split(',') if isinstance(resources, str) else resources - ) - ]} if resources else {}) - } + **( + { + 'uris': [ + uri.strip() + for uri in ( + resources.split(',') + if isinstance(resources, str) + else resources + ) + ] + } + if resources + else {} + ), + }, ) snapshot_id = response.get('snapshot_id') @@ -673,8 +755,14 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): # noinspection PyShadowingBuiltins @action - def search(self, query: Optional[Union[str, dict]] = None, limit: int = 50, offset: int = 0, type: str = 'track', - **filter) -> Iterable[dict]: + def search( + self, + query: Optional[Union[str, dict]] = None, + limit: int = 50, + offset: int = 0, + type: str = 'track', + **filter, + ) -> Iterable[dict]: """ Search for tracks matching a certain criteria. @@ -714,12 +802,16 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): }.get('uri', []) uris = uri.split(',') if isinstance(uri, str) else uri - params = { - 'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]), - } if uris else { - 'q': self._make_filter(query, **filter), - 'type': type, - } + params = ( + { + 'ids': ','.join([uri.split(':')[-1].strip() for uri in uris]), + } + if uris + else { + 'q': self._make_filter(query, **filter), + 'type': type, + } + ) response = self._spotify_paginate_results( f'/v1/{type + "s" if uris else "search"}', @@ -739,7 +831,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): track.get('track'), track.get('title'), track.get('popularity'), - ) + ), ) schema_class = None @@ -759,6 +851,31 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): return response + @action + def create_playlist( + self, name: str, description: Optional[str] = None, public: bool = False + ): + """ + Create a playlist. + + :param name: Playlist name. + :param description: Optional playlist description. + :param public: Whether the new playlist should be public + (default: False). + :return: .. schema:: spotify.SpotifyPlaylistSchema + """ + ret = self.spotify_user_call( + '/v1/users/me/playlists', + method='post', + json={ + 'name': name, + 'description': description, + 'public': public, + }, + ) + + return SpotifyPlaylistSchema().dump(ret) + @action def follow_playlist(self, playlist: str, public: bool = True): """ @@ -774,7 +891,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): method='put', json={ 'public': public, - } + }, ) @action @@ -792,10 +909,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): @staticmethod def _uris_to_id(*uris: str) -> Iterable[str]: - return [ - uri.split(':')[-1] - for uri in uris - ] + return [uri.split(':')[-1] for uri in uris] @action def get_albums(self, limit: int = 50, offset: int = 0) -> List[dict]: @@ -811,7 +925,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): '/v1/me/albums', limit=limit, offset=offset, - ), many=True + ), + many=True, ) @action @@ -852,9 +967,7 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): return [ SpotifyTrackSchema().dump(item['track']) for item in self._spotify_paginate_results( - '/v1/me/tracks', - limit=limit, - offset=offset + '/v1/me/tracks', limit=limit, offset=offset ) ] @@ -898,7 +1011,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): '/v1/me/episodes', limit=limit, offset=offset, - ), many=True + ), + many=True, ) @action @@ -941,7 +1055,8 @@ class MusicSpotifyPlugin(MusicPlugin, SpotifyMixin): '/v1/me/shows', limit=limit, offset=offset, - ), many=True + ), + many=True, ) @action diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py new file mode 100644 index 00000000..1b32eb6c --- /dev/null +++ b/platypush/plugins/music/tidal/__init__.py @@ -0,0 +1,417 @@ +import json +import os +import pathlib +import requests + +from datetime import datetime +from urllib.parse import urljoin +from typing import Iterable, Optional, Union + +from platypush.config import Config +from platypush.context import Variable, get_bus +from platypush.message.event.music.tidal import TidalPlaylistUpdatedEvent +from platypush.plugins import RunnablePlugin, action +from platypush.plugins.music.tidal.workers import get_items +from platypush.schemas.tidal import ( + TidalAlbumSchema, + TidalPlaylistSchema, + TidalArtistSchema, + TidalSearchResultsSchema, + TidalTrackSchema, +) + + +class MusicTidalPlugin(RunnablePlugin): + """ + Plugin to interact with the user's Tidal account and library. + + Upon the first login, the application will prompt you with a link to + connect to your Tidal account. Once authorized, you should no longer be + required to explicitly login. + + Triggers: + + * :class:`platypush.message.event.music.TidalPlaylistUpdatedEvent`: when a user playlist + is updated. + + Requires: + + * **tidalapi** (``pip install tidalapi``) + + """ + + _base_url = 'https://api.tidalhifi.com/v1/' + _default_credentials_file = os.path.join( + str(Config.get('workdir')), 'tidal', 'credentials.json' + ) + + def __init__( + self, + quality: str = 'high', + credentials_file: str = _default_credentials_file, + **kwargs, + ): + """ + :param quality: Default audio quality. Default: ``high``. + Supported: [``loseless``, ``master``, ``high``, ``low``]. + :param credentials_file: Path to the file where the OAuth session + parameters will be stored (default: + ``/tidal/credentials.json``). + """ + from tidalapi import Quality + + super().__init__(**kwargs) + self._credentials_file = credentials_file + self._user_playlists = {} + + try: + self._quality = getattr(Quality, quality.lower()) + except AttributeError: + raise AssertionError( + f'Invalid quality: {quality}. Supported values: ' + f'{[q.name for q in Quality]}' + ) + + self._session = None + + def _oauth_open_saved_session(self): + if not self._session: + return + + try: + with open(self._credentials_file, 'r') as f: + data = json.load(f) + self._session.load_oauth_session( + data['token_type'], data['access_token'], data['refresh_token'] + ) + except Exception as e: + self.logger.warning('Could not load %s: %s', self._credentials_file, e) + + def _oauth_create_new_session(self): + if not self._session: + return + + self._session.login_oauth_simple(function=self.logger.warning) # type: ignore + if self._session.check_login(): + data = { + 'token_type': self._session.token_type, + 'session_id': self._session.session_id, + 'access_token': self._session.access_token, + 'refresh_token': self._session.refresh_token, + } + + pathlib.Path(os.path.dirname(self._credentials_file)).mkdir( + parents=True, exist_ok=True + ) + + with open(self._credentials_file, 'w') as outfile: + json.dump(data, outfile) + + @property + def session(self): + from tidalapi import Config, Session + + if self._session and self._session.check_login(): + return self._session + + # Attempt to reload the existing session from file + self._session = Session(config=Config(quality=self._quality)) + self._oauth_open_saved_session() + if not self._session.check_login(): + # Create a new session if we couldn't load an existing one + self._oauth_create_new_session() + + assert ( + self._session.user and self._session.check_login() + ), 'Could not connect to TIDAL' + + return self._session + + @property + def user(self): + user = self.session.user + assert user, 'Not logged in' + return user + + def _api_request(self, url, *args, method='get', **kwargs): + method = getattr(requests, method.lower()) + url = urljoin(self._base_url, url) + kwargs['headers'] = kwargs.get('headers', {}) + kwargs['params'] = kwargs.get('params', {}) + kwargs['params'].update( + { + 'sessionId': self.session.session_id, + 'countryCode': self.session.country_code, + } + ) + + rs = None + kwargs['headers']['Authorization'] = '{type} {token}'.format( + type=self.session.token_type, token=self.session.access_token + ) + + try: + rs = method(url, *args, **kwargs) + rs.raise_for_status() + return rs + except requests.HTTPError as e: + if rs: + self.logger.error(rs.text) + raise e + + @action + def create_playlist(self, name: str, description: Optional[str] = None): + """ + Create a new playlist. + + :param name: Playlist name. + :param description: Optional playlist description. + :return: .. schema:: tidal.TidalPlaylistSchema + """ + ret = self._api_request( + url=f'users/{self.user.id}/playlists', + method='post', + data={ + 'title': name, + 'description': description, + }, + ) + + return TidalPlaylistSchema().dump(ret.json()) + + @action + def delete_playlist(self, playlist_id: str): + """ + Delete a playlist by ID. + + :param playlist_id: ID of the playlist to delete. + """ + self._api_request(url=f'playlists/{playlist_id}', method='delete') + + @action + def edit_playlist(self, playlist_id: str, title=None, description=None): + """ + Edit a playlist's metadata. + + :param name: New name. + :param description: New description. + """ + pl = self.user.playlist(playlist_id) + pl.edit(title=title, description=description) + + @action + def get_playlists(self): + """ + Get the user's playlists (track lists are excluded). + + :return: .. schema:: tidal.TidalPlaylistSchema(many=True) + """ + ret = self.user.playlists() + self.user.favorites.playlists() + + return TidalPlaylistSchema().dump(ret, many=True) + + @action + def get_playlist(self, playlist_id: str): + """ + Get the details of a playlist (including tracks). + + :param playlist_id: Playlist ID. + :return: .. schema:: tidal.TidalPlaylistSchema + """ + pl = self.session.playlist(playlist_id) + pl._tracks = get_items(pl.tracks) + return TidalPlaylistSchema().dump(pl) + + @action + def get_artist(self, artist_id: Union[str, int]): + """ + Get the details of an artist. + + :param artist_id: Artist ID. + :return: .. schema:: tidal.TidalArtistSchema + """ + ret = self.session.artist(artist_id) + ret.albums = get_items(ret.get_albums) + return TidalArtistSchema().dump(ret) + + @action + def get_album(self, album_id: Union[str, int]): + """ + Get the details of an album. + + :param artist_id: Album ID. + :return: .. schema:: tidal.TidalAlbumSchema + """ + ret = self.session.album(album_id) + return TidalAlbumSchema().dump(ret) + + @action + def get_track(self, track_id: Union[str, int]): + """ + Get the details of an track. + + :param artist_id: Track ID. + :return: .. schema:: tidal.TidalTrackSchema + """ + ret = self.session.album(track_id) + return TidalTrackSchema().dump(ret) + + @action + def search( + self, + query: str, + limit: int = 50, + offset: int = 0, + type: Optional[str] = None, + ): + """ + Perform a search. + + :param query: Query string. + :param limit: Maximum results that should be returned (default: 50). + :param offset: Search offset (default: 0). + :param type: Type of results that should be returned. Default: None + (return all the results that match the query). Supported: + ``artist``, ``album``, ``track`` and ``playlist``. + :return: .. schema:: tidal.TidalSearchResultsSchema + """ + from tidalapi.artist import Artist + from tidalapi.album import Album + from tidalapi.media import Track + from tidalapi.playlist import Playlist + + models = None + if type is not None: + if type == 'artist': + models = [Artist] + elif type == 'album': + models = [Album] + elif type == 'track': + models = [Track] + elif type == 'playlist': + models = [Playlist] + else: + raise AssertionError(f'Unsupported search type: {type}') + + ret = self.session.search(query, models=models, limit=limit, offset=offset) + + return TidalSearchResultsSchema().dump(ret) + + @action + def get_download_url(self, track_id: str) -> str: + """ + Get the direct download URL of a track. + + :param artist_id: Track ID. + """ + return self.session.track(track_id).get_url() + + @action + def add_to_playlist(self, playlist_id: str, track_ids: Iterable[str]): + """ + Append one or more tracks to a playlist. + + :param playlist_id: Target playlist ID. + :param track_ids: List of track IDs to append. + """ + return self._api_request( + url=f'playlists/{playlist_id}/items', + method='post', + headers={ + 'If-None-Match': None, + }, + data={ + 'onArtifactNotFound': 'SKIP', + 'onDupes': 'SKIP', + 'trackIds': ','.join(map(str, track_ids)), + }, + ) + + @action + def add_track(self, track_id: Union[str, int]): + """ + Add a track to the user's collection. + + :param track_id: Track ID. + """ + self.user.favorites.add_track(track_id) + + @action + def add_album(self, album_id: Union[str, int]): + """ + Add an album to the user's collection. + + :param album_id: Album ID. + """ + self.user.favorites.add_album(album_id) + + @action + def add_artist(self, artist_id: Union[str, int]): + """ + Add an artist to the user's collection. + + :param artist_id: Artist ID. + """ + self.user.favorites.add_artist(artist_id) + + @action + def add_playlist(self, playlist_id: str): + """ + Add a playlist to the user's collection. + + :param playlist_id: Playlist ID. + """ + self.user.favorites.add_playlist(playlist_id) + + @action + def remove_track(self, track_id: Union[str, int]): + """ + Remove a track from the user's collection. + + :param track_id: Track ID. + """ + self.user.favorites.remove_track(track_id) + + @action + def remove_album(self, album_id: Union[str, int]): + """ + Remove an album from the user's collection. + + :param album_id: Album ID. + """ + self.user.favorites.remove_album(album_id) + + @action + def remove_artist(self, artist_id: Union[str, int]): + """ + Remove an artist from the user's collection. + + :param artist_id: Artist ID. + """ + self.user.favorites.remove_artist(artist_id) + + @action + def remove_playlist(self, playlist_id: str): + """ + Remove a playlist from the user's collection. + + :param playlist_id: Playlist ID. + """ + self.user.favorites.remove_playlist(playlist_id) + + def main(self): + while not self.should_stop(): + playlists = self.session.user.playlists() # type: ignore + + for pl in playlists: + last_updated_var = Variable(f'TIDAL_PLAYLIST_LAST_UPDATE[{pl.id}]') + prev_last_updated = last_updated_var.get() + if prev_last_updated: + prev_last_updated = datetime.fromisoformat(prev_last_updated) + if pl.last_updated > prev_last_updated: + get_bus().post(TidalPlaylistUpdatedEvent(playlist_id=pl.id)) + + if not prev_last_updated or pl.last_updated > prev_last_updated: + last_updated_var.set(pl.last_updated.isoformat()) + + self.wait_stop(self.poll_interval) diff --git a/platypush/plugins/music/tidal/manifest.yaml b/platypush/plugins/music/tidal/manifest.yaml new file mode 100644 index 00000000..7fde4ffd --- /dev/null +++ b/platypush/plugins/music/tidal/manifest.yaml @@ -0,0 +1,9 @@ +manifest: + events: + - platypush.message.event.music.TidalPlaylistUpdatedEvent: when a user playlist + is updated. + install: + pip: + - tidalapi + package: platypush.plugins.music.tidal + type: plugin diff --git a/platypush/plugins/music/tidal/workers.py b/platypush/plugins/music/tidal/workers.py new file mode 100644 index 00000000..feea0ea9 --- /dev/null +++ b/platypush/plugins/music/tidal/workers.py @@ -0,0 +1,56 @@ +from concurrent.futures import ThreadPoolExecutor +from typing import Callable + + +def func_wrapper(args): + (f, offset, *args) = args + items = f(*args) + return [(i + offset, item) for i, item in enumerate(items)] + + +def get_items( + func: Callable, + *args, + parse: Callable = lambda _: _, + chunk_size: int = 100, + processes: int = 5, +): + """ + This function performs pagination on a function that supports + `limit`/`offset` parameters and it runs API requests in parallel to speed + things up. + """ + items = [] + offsets = [-chunk_size] + remaining = chunk_size * processes + + with ThreadPoolExecutor( + processes, thread_name_prefix=f'mopidy-tidal-{func.__name__}-' + ) as pool: + while remaining == chunk_size * processes: + offsets = [offsets[-1] + chunk_size * (i + 1) for i in range(processes)] + + pool_results = pool.map( + func_wrapper, + [ + ( + func, + offset, + *args, + chunk_size, # limit + offset, # offset + ) + for offset in offsets + ], + ) + + new_items = [] + for results in pool_results: + new_items.extend(results) + + remaining = len(new_items) + items.extend(new_items) + + items = sorted([_ for _ in items if _], key=lambda item: item[0]) + sorted_items = [item[1] for item in items] + return list(map(parse, sorted_items)) diff --git a/platypush/schemas/tidal.py b/platypush/schemas/tidal.py new file mode 100644 index 00000000..4b17b4b9 --- /dev/null +++ b/platypush/schemas/tidal.py @@ -0,0 +1,206 @@ +from marshmallow import Schema, fields, pre_dump + +from platypush.schemas import DateTime + + +class TidalSchema(Schema): + pass + + +class TidalArtistSchema(TidalSchema): + id = fields.String( + required=True, + dump_only=True, + metadata={ + 'example': '3288612', + 'description': 'Artist ID', + }, + ) + + url = fields.String( + required=True, + dump_only=True, + metadata={ + 'description': 'Artist Tidal URL', + 'example': 'https://tidal.com/artist/3288612', + }, + ) + + name = fields.String(required=True) + albums = fields.Nested("TidalAlbumSchema", many=True) + + @pre_dump + def _prefill_url(self, data, *_, **__): + data.url = f'https://tidal.com/artist/{data.id}' + return data + + +class TidalAlbumSchema(TidalSchema): + id = fields.String( + required=True, + dump_only=True, + attribute='uuid', + metadata={ + 'example': '45288612', + 'description': 'Album ID', + }, + ) + + url = fields.String( + required=True, + dump_only=True, + metadata={ + 'description': 'Album Tidal URL', + 'example': 'https://tidal.com/album/45288612', + }, + ) + + name = fields.String(required=True) + artist = fields.Nested(TidalArtistSchema) + duration = fields.Int(metadata={'description': 'Album duration, in seconds'}) + year = fields.Integer(metadata={'example': 2003}) + num_tracks = fields.Int(metadata={'example': 10}) + + @pre_dump + def _prefill_url(self, data, *_, **__): + data.url = f'https://tidal.com/album/{data.id}' + return data + + +class TidalTrackSchema(TidalSchema): + id = fields.String( + required=True, + dump_only=True, + metadata={ + 'example': '25288614', + 'description': 'Track ID', + }, + ) + + url = fields.String( + required=True, + dump_only=True, + metadata={ + 'description': 'Track Tidal URL', + 'example': 'https://tidal.com/track/25288614', + }, + ) + + artist = fields.Nested(TidalArtistSchema) + album = fields.Nested(TidalAlbumSchema) + name = fields.String(metadata={'description': 'Track title'}) + duration = fields.Int(metadata={'description': 'Track duration, in seconds'}) + track_num = fields.Int( + metadata={'description': 'Index of the track within the album'} + ) + + @pre_dump + def _prefill_url(self, data, *_, **__): + data.url = f'https://tidal.com/track/{data.id}' + return data + + +class TidalPlaylistSchema(TidalSchema): + id = fields.String( + required=True, + dump_only=True, + attribute='uuid', + metadata={ + 'example': '2b288612-34f5-11ed-b42d-001500e8f607', + 'description': 'Playlist ID', + }, + ) + + url = fields.String( + required=True, + dump_only=True, + metadata={ + 'description': 'Playlist Tidal URL', + 'example': 'https://tidal.com/playlist/2b288612-34f5-11ed-b42d-001500e8f607', + }, + ) + + name = fields.String(required=True) + description = fields.String() + duration = fields.Int(metadata={'description': 'Playlist duration, in seconds'}) + public = fields.Boolean(attribute='publicPlaylist') + owner = fields.String( + attribute='creator', + metadata={ + 'description': 'Playlist creator/owner ID', + }, + ) + + num_tracks = fields.Int( + attribute='numberOfTracks', + default=0, + metadata={ + 'example': 42, + 'description': 'Number of tracks in the playlist', + }, + ) + + created_at = DateTime( + attribute='created', + metadata={ + 'description': 'When the playlist was created', + }, + ) + + last_updated_at = DateTime( + attribute='lastUpdated', + metadata={ + 'description': 'When the playlist was last updated', + }, + ) + + tracks = fields.Nested(TidalTrackSchema, many=True) + + def _flatten_object(self, data, *_, **__): + if not isinstance(data, dict): + data = { + 'created': data.created, + 'creator': data.creator.id, + 'description': data.description, + 'duration': data.duration, + 'lastUpdated': data.last_updated, + 'uuid': data.id, + 'name': data.name, + 'numberOfTracks': data.num_tracks, + 'publicPlaylist': data.public, + 'tracks': getattr(data, '_tracks', []), + } + + return data + + def _normalize_owner(self, data, *_, **__): + owner = data.pop('owner', data.pop('creator', None)) + if owner: + if isinstance(owner, dict): + owner = owner['id'] + data['creator'] = owner + + return data + + def _normalize_name(self, data, *_, **__): + if data.get('title'): + data['name'] = data.pop('title') + return data + + @pre_dump + def normalize(self, data, *_, **__): + if not isinstance(data, dict): + data = self._flatten_object(data) + + self._normalize_name(data) + self._normalize_owner(data) + if 'tracks' not in data: + data['tracks'] = [] + return data + + +class TidalSearchResultsSchema(TidalSchema): + artists = fields.Nested(TidalArtistSchema, many=True) + albums = fields.Nested(TidalAlbumSchema, many=True) + tracks = fields.Nested(TidalTrackSchema, many=True) + playlists = fields.Nested(TidalPlaylistSchema, many=True) diff --git a/setup.py b/setup.py index c338b09c..5db5d588 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ setup( 'zeroconf>=0.27.0', 'tz', 'python-dateutil', - 'cryptography', + # 'cryptography', 'pyjwt', 'marshmallow', 'frozendict', From 1b405de0d5182fdc2cdf1cdad81a5200a0254cde Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 17 Sep 2022 06:09:39 +0200 Subject: [PATCH 16/24] Added missing docs --- docs/source/events.rst | 1 + docs/source/platypush/events/music.tidal.rst | 5 +++++ docs/source/platypush/plugins/music.tidal.rst | 5 +++++ docs/source/plugins.rst | 1 + 4 files changed, 12 insertions(+) create mode 100644 docs/source/platypush/events/music.tidal.rst create mode 100644 docs/source/platypush/plugins/music.tidal.rst diff --git a/docs/source/events.rst b/docs/source/events.rst index 13d3ed79..ec048ba5 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -47,6 +47,7 @@ Events platypush/events/mqtt.rst platypush/events/music.rst platypush/events/music.snapcast.rst + platypush/events/music.tidal.rst platypush/events/nextcloud.rst platypush/events/nfc.rst platypush/events/ngrok.rst diff --git a/docs/source/platypush/events/music.tidal.rst b/docs/source/platypush/events/music.tidal.rst new file mode 100644 index 00000000..b249756f --- /dev/null +++ b/docs/source/platypush/events/music.tidal.rst @@ -0,0 +1,5 @@ +``music.tidal`` +=============== + +.. automodule:: platypush.message.event.music.tidal + :members: diff --git a/docs/source/platypush/plugins/music.tidal.rst b/docs/source/platypush/plugins/music.tidal.rst new file mode 100644 index 00000000..af6ee863 --- /dev/null +++ b/docs/source/platypush/plugins/music.tidal.rst @@ -0,0 +1,5 @@ +``music.tidal`` +=============== + +.. automodule:: platypush.plugins.music.tidal + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 41be8a47..436232a2 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -94,6 +94,7 @@ Plugins platypush/plugins/music.mpd.rst platypush/plugins/music.snapcast.rst platypush/plugins/music.spotify.rst + platypush/plugins/music.tidal.rst platypush/plugins/nextcloud.rst platypush/plugins/ngrok.rst platypush/plugins/nmap.rst From a9ebb4805a0509faeaa78ae8be8c528592f16c76 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 17 Sep 2022 06:25:28 +0200 Subject: [PATCH 17/24] Fixed doc warnings --- platypush/plugins/db/__init__.py | 2 +- platypush/plugins/music/tidal/__init__.py | 2 +- platypush/schemas/tidal.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 1ff56daa..e88ae6a6 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -154,7 +154,7 @@ class DbPlugin(Plugin): ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) :param kwargs: Extra kwargs that will be passed to - ``sqlalchemy.create_engine`` (see + ``sqlalchemy.create_engine`` (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) :returns: List of hashes representing the result rows. diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index 1b32eb6c..2cf57d73 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -55,7 +55,7 @@ class MusicTidalPlugin(RunnablePlugin): :param quality: Default audio quality. Default: ``high``. Supported: [``loseless``, ``master``, ``high``, ``low``]. :param credentials_file: Path to the file where the OAuth session - parameters will be stored (default: + parameters will be stored (default: ``/tidal/credentials.json``). """ from tidalapi import Quality diff --git a/platypush/schemas/tidal.py b/platypush/schemas/tidal.py index 4b17b4b9..d7df81b9 100644 --- a/platypush/schemas/tidal.py +++ b/platypush/schemas/tidal.py @@ -27,7 +27,6 @@ class TidalArtistSchema(TidalSchema): ) name = fields.String(required=True) - albums = fields.Nested("TidalAlbumSchema", many=True) @pre_dump def _prefill_url(self, data, *_, **__): From 7c610adc8499712c0b77ef3f6ea8b5ced939c5ab Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 17 Sep 2022 06:30:20 +0200 Subject: [PATCH 18/24] FIX: Apply expanduser to the credentials_file setting in music.tidal --- platypush/plugins/music/tidal/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index 2cf57d73..a432dde7 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -61,7 +61,7 @@ class MusicTidalPlugin(RunnablePlugin): from tidalapi import Quality super().__init__(**kwargs) - self._credentials_file = credentials_file + self._credentials_file = os.path.expanduser(credentials_file) self._user_playlists = {} try: From 61cda60751146b48fae26bbeb4649970bd8a84f8 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 18 Sep 2022 05:22:12 +0200 Subject: [PATCH 19/24] Proper implementation for Tidal's add_to_playlist and remove_from_playlist methods - Using tidalapi's `UserPlaylist.add` and `UserPlaylist.delete` methods instead of defining my own through `_api_request`, so we won't have to deal with the logic to set the ETag header. - Added `remove_from_playlist` method. --- platypush/plugins/music/tidal/__init__.py | 48 ++++++++++++++------- platypush/plugins/music/tidal/manifest.yaml | 2 +- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index a432dde7..a6d5e8f3 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -36,7 +36,7 @@ class MusicTidalPlugin(RunnablePlugin): Requires: - * **tidalapi** (``pip install tidalapi``) + * **tidalapi** (``pip install 'tidalapi >= 0.7.0'``) """ @@ -186,7 +186,8 @@ class MusicTidalPlugin(RunnablePlugin): :param playlist_id: ID of the playlist to delete. """ - self._api_request(url=f'playlists/{playlist_id}', method='delete') + pl = self.user.playlist(playlist_id) + pl.delete() @action def edit_playlist(self, playlist_id: str, title=None, description=None): @@ -207,7 +208,6 @@ class MusicTidalPlugin(RunnablePlugin): :return: .. schema:: tidal.TidalPlaylistSchema(many=True) """ ret = self.user.playlists() + self.user.favorites.playlists() - return TidalPlaylistSchema().dump(ret, many=True) @action @@ -307,25 +307,41 @@ class MusicTidalPlugin(RunnablePlugin): return self.session.track(track_id).get_url() @action - def add_to_playlist(self, playlist_id: str, track_ids: Iterable[str]): + def add_to_playlist(self, playlist_id: str, track_ids: Iterable[Union[str, int]]): """ Append one or more tracks to a playlist. :param playlist_id: Target playlist ID. :param track_ids: List of track IDs to append. """ - return self._api_request( - url=f'playlists/{playlist_id}/items', - method='post', - headers={ - 'If-None-Match': None, - }, - data={ - 'onArtifactNotFound': 'SKIP', - 'onDupes': 'SKIP', - 'trackIds': ','.join(map(str, track_ids)), - }, - ) + pl = self.user.playlist(playlist_id) + pl.add(track_ids) + + @action + def remove_from_playlist( + self, + playlist_id: str, + track_id: Optional[Union[str, int]] = None, + index: Optional[int] = None, + ): + """ + Remove a track from a playlist. + + Specify either the ``track_id`` or the ``index``. + + :param playlist_id: Target playlist ID. + :param track_id: ID of the track to remove. + :param index: Index of the track to remove. + """ + assert not ( + track_id is None and index is None + ), 'Please specify either track_id or index' + + pl = self.user.playlist(playlist_id) + if index: + pl.remove_by_index(index) + if track_id: + pl.remove_by_id(track_id) @action def add_track(self, track_id: Union[str, int]): diff --git a/platypush/plugins/music/tidal/manifest.yaml b/platypush/plugins/music/tidal/manifest.yaml index 7fde4ffd..e047ad5d 100644 --- a/platypush/plugins/music/tidal/manifest.yaml +++ b/platypush/plugins/music/tidal/manifest.yaml @@ -4,6 +4,6 @@ manifest: is updated. install: pip: - - tidalapi + - tidalapi >= 0.7.0 package: platypush.plugins.music.tidal type: plugin From 36dd645209367b36dc259496bf41fc3de50ac5e6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 18 Sep 2022 06:04:53 +0200 Subject: [PATCH 20/24] Use session.playlist instead of session.user.playlist to query playlists --- platypush/plugins/music/tidal/__init__.py | 48 +++-------------------- 1 file changed, 6 insertions(+), 42 deletions(-) diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index a6d5e8f3..14230578 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -1,10 +1,8 @@ import json import os import pathlib -import requests from datetime import datetime -from urllib.parse import urljoin from typing import Iterable, Optional, Union from platypush.config import Config @@ -133,32 +131,6 @@ class MusicTidalPlugin(RunnablePlugin): assert user, 'Not logged in' return user - def _api_request(self, url, *args, method='get', **kwargs): - method = getattr(requests, method.lower()) - url = urljoin(self._base_url, url) - kwargs['headers'] = kwargs.get('headers', {}) - kwargs['params'] = kwargs.get('params', {}) - kwargs['params'].update( - { - 'sessionId': self.session.session_id, - 'countryCode': self.session.country_code, - } - ) - - rs = None - kwargs['headers']['Authorization'] = '{type} {token}'.format( - type=self.session.token_type, token=self.session.access_token - ) - - try: - rs = method(url, *args, **kwargs) - rs.raise_for_status() - return rs - except requests.HTTPError as e: - if rs: - self.logger.error(rs.text) - raise e - @action def create_playlist(self, name: str, description: Optional[str] = None): """ @@ -168,16 +140,8 @@ class MusicTidalPlugin(RunnablePlugin): :param description: Optional playlist description. :return: .. schema:: tidal.TidalPlaylistSchema """ - ret = self._api_request( - url=f'users/{self.user.id}/playlists', - method='post', - data={ - 'title': name, - 'description': description, - }, - ) - - return TidalPlaylistSchema().dump(ret.json()) + ret = self.user.create_playlist(name, description) + return TidalPlaylistSchema().dump(ret) @action def delete_playlist(self, playlist_id: str): @@ -186,7 +150,7 @@ class MusicTidalPlugin(RunnablePlugin): :param playlist_id: ID of the playlist to delete. """ - pl = self.user.playlist(playlist_id) + pl = self.session.playlist(playlist_id) pl.delete() @action @@ -197,7 +161,7 @@ class MusicTidalPlugin(RunnablePlugin): :param name: New name. :param description: New description. """ - pl = self.user.playlist(playlist_id) + pl = self.session.playlist(playlist_id) pl.edit(title=title, description=description) @action @@ -314,7 +278,7 @@ class MusicTidalPlugin(RunnablePlugin): :param playlist_id: Target playlist ID. :param track_ids: List of track IDs to append. """ - pl = self.user.playlist(playlist_id) + pl = self.session.playlist(playlist_id) pl.add(track_ids) @action @@ -337,7 +301,7 @@ class MusicTidalPlugin(RunnablePlugin): track_id is None and index is None ), 'Please specify either track_id or index' - pl = self.user.playlist(playlist_id) + pl = self.session.playlist(playlist_id) if index: pl.remove_by_index(index) if track_id: From c2a3f2f4f3169e093213b9fc3bde7fe6320613b5 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 18 Sep 2022 19:54:32 +0200 Subject: [PATCH 21/24] =?UTF-8?q?Bump=20version:=200.23.4=20=E2=86=92=200.?= =?UTF-8?q?23.5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 12 +++++++++++- platypush/__init__.py | 2 +- setup.cfg | 2 +- setup.py | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 692dcb0c..aaf6f1fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2. -## [Unreleased] +## [0.23.5] - 2022-09-18 ### Added @@ -13,6 +13,16 @@ reported only starting from v0.20.2. - Added [Tidal integration](https://git.platypush.tech/platypush/platypush/pulls/223) +- Added support for [OPML + subscriptions](https://git.platypush.tech/platypush/platypush/pulls/220) to + the `rss` plugin. + +- Better support for bulk database operations on the `db` plugin. + +### Fixed + +- Now supporting YAML sections with empty configurations. + ## [0.23.4] - 2022-08-28 ### Added diff --git a/platypush/__init__.py b/platypush/__init__.py index d255a6cd..6842ebf3 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -23,7 +23,7 @@ from .message.response import Response from .utils import set_thread_name, get_enabled_plugins __author__ = 'Fabio Manganiello ' -__version__ = '0.23.4' +__version__ = '0.23.5' logger = logging.getLogger('platypush') diff --git a/setup.cfg b/setup.cfg index 7a618699..70d7a864 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.23.4 +current_version = 0.23.5 commit = True tag = True diff --git a/setup.py b/setup.py index 5db5d588..dcdacca1 100755 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ backend = pkg_files('platypush/backend') setup( name="platypush", - version="0.23.4", + version="0.23.5", author="Fabio Manganiello", author_email="info@fabiomanganiello.com", description="Platypush service", From 09baceab4bf6bdb6a274edbe8127ab8a7d03353d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 19 Sep 2022 20:39:21 +0200 Subject: [PATCH 22/24] Include album_id and the list of tracks in music.tidal.get_album --- platypush/plugins/music/tidal/__init__.py | 2 +- platypush/schemas/tidal.py | 27 +++++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/platypush/plugins/music/tidal/__init__.py b/platypush/plugins/music/tidal/__init__.py index 14230578..484b1c59 100644 --- a/platypush/plugins/music/tidal/__init__.py +++ b/platypush/plugins/music/tidal/__init__.py @@ -207,7 +207,7 @@ class MusicTidalPlugin(RunnablePlugin): :return: .. schema:: tidal.TidalAlbumSchema """ ret = self.session.album(album_id) - return TidalAlbumSchema().dump(ret) + return TidalAlbumSchema(with_tracks=True).dump(ret) @action def get_track(self, track_id: Union[str, int]): diff --git a/platypush/schemas/tidal.py b/platypush/schemas/tidal.py index d7df81b9..4dde5fca 100644 --- a/platypush/schemas/tidal.py +++ b/platypush/schemas/tidal.py @@ -1,4 +1,4 @@ -from marshmallow import Schema, fields, pre_dump +from marshmallow import Schema, fields, pre_dump, post_dump from platypush.schemas import DateTime @@ -35,10 +35,13 @@ class TidalArtistSchema(TidalSchema): class TidalAlbumSchema(TidalSchema): + def __init__(self, *args, with_tracks=False, **kwargs): + super().__init__(*args, **kwargs) + self._with_tracks = with_tracks + id = fields.String( required=True, dump_only=True, - attribute='uuid', metadata={ 'example': '45288612', 'description': 'Album ID', @@ -59,12 +62,32 @@ class TidalAlbumSchema(TidalSchema): duration = fields.Int(metadata={'description': 'Album duration, in seconds'}) year = fields.Integer(metadata={'example': 2003}) num_tracks = fields.Int(metadata={'example': 10}) + tracks = fields.List(fields.Dict(), attribute='_tracks') @pre_dump def _prefill_url(self, data, *_, **__): data.url = f'https://tidal.com/album/{data.id}' return data + @pre_dump + def _cache_tracks(self, data, *_, **__): + if self._with_tracks: + album_id = str(data.id) + self.context[album_id] = { + 'tracks': data.tracks(), + } + + return data + + @post_dump + def _dump_tracks(self, data, *_, **__): + if self._with_tracks: + album_id = str(data['id']) + ctx = self.context.pop(album_id, {}) + data['tracks'] = TidalTrackSchema().dump(ctx.pop('tracks', []), many=True) + + return data + class TidalTrackSchema(TidalSchema): id = fields.String( From 3d74f0a11f326a7dc4d1c4284e7c8faf59c885cd Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 19 Sep 2022 20:40:54 +0200 Subject: [PATCH 23/24] Updated CHANGELOG --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aaf6f1fc..2ee36b10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2. +## [0.23.6] - 2022-09-19 + +### Fixed + +- Fixed album_id and list of tracks on `music.tidal.get_album`. + ## [0.23.5] - 2022-09-18 ### Added From a0575ed6dedf155cc05cae9cc349fa43254d58ef Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 19 Sep 2022 20:41:02 +0200 Subject: [PATCH 24/24] =?UTF-8?q?Bump=20version:=200.23.5=20=E2=86=92=200.?= =?UTF-8?q?23.6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- platypush/__init__.py | 2 +- setup.cfg | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index 6842ebf3..3ad7a710 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -23,7 +23,7 @@ from .message.response import Response from .utils import set_thread_name, get_enabled_plugins __author__ = 'Fabio Manganiello ' -__version__ = '0.23.5' +__version__ = '0.23.6' logger = logging.getLogger('platypush') diff --git a/setup.cfg b/setup.cfg index 70d7a864..955725c5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.23.5 +current_version = 0.23.6 commit = True tag = True diff --git a/setup.py b/setup.py index dcdacca1..9b2aff4c 100755 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ backend = pkg_files('platypush/backend') setup( name="platypush", - version="0.23.5", + version="0.23.6", author="Fabio Manganiello", author_email="info@fabiomanganiello.com", description="Platypush service",