From 5dd7345c0b2bf63fd79ff65b25147d7e97db23c0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 15:51:11 +0200 Subject: [PATCH 1/5] Sync the latest parse timestamps in main instead of __init__ in rss. We should load the latest timestamps from the db when the thread starts instead of doing it in the constructor. The constructor may be invoked when the entities engine hasn't been initialized yet, and result in deadlocks. --- platypush/plugins/rss/__init__.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index ae274af13d..f27d704cf8 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -4,17 +4,17 @@ import queue import re import threading import time - -from dateutil.tz import tzutc from typing import Iterable, Optional, Collection, Set from xml.etree import ElementTree import dateutil.parser +from dateutil.tz import tzutc import requests from platypush.context import get_bus, get_plugin from platypush.message.event.rss import NewFeedEntryEvent from platypush.plugins import RunnablePlugin, action +from platypush.plugins.variable import VariablePlugin from platypush.schemas.rss import RssFeedEntrySchema @@ -61,8 +61,14 @@ class RssPlugin(RunnablePlugin): self._latest_entries = [] self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) + self._latest_timestamps = {} - self._latest_timestamps = self._get_latest_timestamps() + @classmethod + @property + def _variable(cls) -> VariablePlugin: + var = get_plugin(VariablePlugin) + assert var, 'Could not load the variable plugin' + return var @staticmethod def _get_feed_latest_timestamp_varname(url: str) -> str: @@ -70,21 +76,20 @@ class RssPlugin(RunnablePlugin): @classmethod def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: - t = ( - get_plugin('variable') - .get(cls._get_feed_latest_timestamp_varname(url)) - .output.get(cls._get_feed_latest_timestamp_varname(url)) - ) + varname = cls._get_feed_latest_timestamp_varname(url) + var: dict = cls._variable.get(varname).output or {} # type: ignore + t = var.get(varname) if t: return dateutil.parser.isoparse(t) + return None + def _get_latest_timestamps(self) -> dict: return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} def _update_latest_timestamps(self) -> None: - variable = get_plugin('variable') - variable.set( + self._variable.set( **{ self._get_feed_latest_timestamp_varname(url): latest_timestamp for url, latest_timestamp in self._latest_timestamps.items() @@ -95,7 +100,7 @@ class RssPlugin(RunnablePlugin): def _parse_content(entry) -> Optional[str]: content = getattr(entry, 'content', None) if not content: - return + return None if isinstance(content, list): return content[0]['value'] @@ -139,7 +144,7 @@ class RssPlugin(RunnablePlugin): for entry in feed.entries if getattr(entry, 'published_parsed', None) ], - key=lambda e: e['published'], + key=lambda e: e['published'], # type: ignore ), many=True, ) @@ -316,6 +321,7 @@ class RssPlugin(RunnablePlugin): return ElementTree.tostring(root, encoding='utf-8', method='xml').decode() def main(self): + self._latest_timestamps = self._get_latest_timestamps() self._feed_workers = [ threading.Thread(target=self._feed_worker, args=(q,)) for q in self._feed_worker_queues From ca954904123064b5b115ed8308558b73e2003613 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 15:53:30 +0200 Subject: [PATCH 2/5] Added timeout parameter to requests.get in the rss plugin. --- platypush/plugins/rss/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index f27d704cf8..becb098320 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -118,7 +118,7 @@ class RssPlugin(RunnablePlugin): import feedparser feed = feedparser.parse( - requests.get(url, headers={'User-Agent': self.user_agent}).text + requests.get(url, headers={'User-Agent': self.user_agent}, timeout=20).text, ) return RssFeedEntrySchema().dump( sorted( @@ -331,7 +331,7 @@ class RssPlugin(RunnablePlugin): worker.start() self.logger.info( - f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions' + 'Initialized RSS plugin with %d subscriptions', len(self.subscriptions) ) while not self.should_stop(): From d0c89f88a8d6f980f1a711a4915db8a87f7c56b1 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 16:10:31 +0200 Subject: [PATCH 3/5] Fixed parenthesised multi-with statement. This syntax is not supported on Python < 3.9, and therefore it should be broken down as a multi-line statement. --- platypush/runner/_runner.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py index ac713c9d25..ce2a443abf 100644 --- a/platypush/runner/_runner.py +++ b/platypush/runner/_runner.py @@ -49,12 +49,9 @@ class ApplicationRunner: self._print_version() while True: - with ( - CommandStream(parsed_args.ctrl_sock) as stream, - ApplicationProcess( - *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout - ) as self._proc, - ): + with CommandStream(parsed_args.ctrl_sock) as stream, ApplicationProcess( + *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout + ) as self._proc: try: self._listen(stream) except KeyboardInterrupt: From a9cdff900e86f2cabf38847c8589e0d6ab82d69c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 16:16:47 +0200 Subject: [PATCH 4/5] _variable should be an external global function rather than a class property. The combination of `@property` + `@classmethod` isn't supported on Python < 3.9. --- platypush/plugins/rss/__init__.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index becb098320..e7616292c8 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -18,6 +18,12 @@ from platypush.plugins.variable import VariablePlugin from platypush.schemas.rss import RssFeedEntrySchema +def _variable() -> VariablePlugin: + var = get_plugin(VariablePlugin) + assert var, 'Could not load the variable plugin' + return var + + class RssPlugin(RunnablePlugin): """ A plugin for parsing and subscribing to RSS feeds. @@ -63,13 +69,6 @@ class RssPlugin(RunnablePlugin): self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) self._latest_timestamps = {} - @classmethod - @property - def _variable(cls) -> VariablePlugin: - var = get_plugin(VariablePlugin) - assert var, 'Could not load the variable plugin' - return var - @staticmethod def _get_feed_latest_timestamp_varname(url: str) -> str: return f'LATEST_FEED_TIMESTAMP[{url}]' @@ -77,7 +76,7 @@ class RssPlugin(RunnablePlugin): @classmethod def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: varname = cls._get_feed_latest_timestamp_varname(url) - var: dict = cls._variable.get(varname).output or {} # type: ignore + var: dict = _variable().get(varname).output or {} # type: ignore t = var.get(varname) if t: @@ -89,7 +88,7 @@ class RssPlugin(RunnablePlugin): return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} def _update_latest_timestamps(self) -> None: - self._variable.set( + _variable().set( **{ self._get_feed_latest_timestamp_varname(url): latest_timestamp for url, latest_timestamp in self._latest_timestamps.items() From 2cab836bdf61ca2ec2ffa35d5b7e967823b0be1f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 17:20:53 +0200 Subject: [PATCH 5/5] `Entity.columns` class property replaced by `Entity.get_columns` method. Again, Python < 3.9 doesn't like the combination of `@property` + `@classmethod`. --- platypush/entities/_base.py | 11 +++++------ platypush/entities/_engine/repo/merger.py | 6 ++++-- platypush/plugins/rss/__init__.py | 9 +++++++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/platypush/entities/_base.py b/platypush/entities/_base.py index b68d8d8148..c6d5c18b3c 100644 --- a/platypush/entities/_base.py +++ b/platypush/entities/_base.py @@ -117,9 +117,8 @@ if 'entity' not in Base.metadata: 'polymorphic_on': type, } - @classmethod # type: ignore - @property - def columns(cls) -> Tuple[ColumnProperty, ...]: + @classmethod + def get_columns(cls) -> Tuple[ColumnProperty, ...]: inspector = schema_inspect(cls) return tuple(inspector.mapper.column_attrs) @@ -146,7 +145,7 @@ if 'entity' not in Base.metadata: return self.__class__( **dict( key_value_pair(col) # type: ignore - for col in self.columns + for col in self.get_columns() if key_value_pair(col) is not None ), children=[child.copy() for child in self.children], @@ -213,7 +212,7 @@ if 'entity' not in Base.metadata: return { **dict( self._column_to_pair(col) - for col in self.columns + for col in self.get_columns() if self._column_to_pair(col) ), 'children_ids': self.children_ids, @@ -241,7 +240,7 @@ if 'entity' not in Base.metadata: """ Serializes the new value before assigning it to an attribute. """ - matching_columns = [c for c in self.columns if c.expression.name == key] # type: ignore + matching_columns = [c for c in self.get_columns() if c.expression.name == key] # type: ignore if ( matching_columns diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index 71b344c219..a70492dd70 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -176,7 +176,7 @@ class EntitiesMerger: """ Merge two versions of an entity column by column. """ - columns = [col.key for col in entity.columns] + columns = [col.key for col in entity.get_columns()] for col in columns: if col == 'meta': existing_entity.meta = { # type: ignore @@ -189,7 +189,9 @@ class EntitiesMerger: except ObjectDeletedError as e: logger.warning( 'Could not set %s on entity <%s>: %s', - col, existing_entity.entity_key, e + col, + existing_entity.entity_key, + e, ) # Recursive call to merge the columns of the children too diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index e7616292c8..1ef908c7c8 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -44,6 +44,8 @@ class RssPlugin(RunnablePlugin): + 'Chrome/62.0.3202.94 Safari/537.36' ) + timeout = 20 + def __init__( self, subscriptions: Optional[Collection[str]] = None, @@ -117,7 +119,9 @@ class RssPlugin(RunnablePlugin): import feedparser feed = feedparser.parse( - requests.get(url, headers={'User-Agent': self.user_agent}, timeout=20).text, + requests.get( + url, headers={'User-Agent': self.user_agent}, timeout=self.timeout + ).text, ) return RssFeedEntrySchema().dump( sorted( @@ -200,6 +204,7 @@ class RssPlugin(RunnablePlugin): headers={ 'User-Agent': self.user_agent, }, + timeout=self.timeout, ).text except Exception as e: self.logger.warning('Could not retrieve subscription %s: %s', url, e) @@ -365,7 +370,7 @@ class RssPlugin(RunnablePlugin): url = response['url'] error = response.get('error') if error: - self.logger.error(f'Could not parse feed {url}: {error}') + self.logger.error('Could not parse feed %s: %s', url, error) responses[url] = error else: responses[url] = response['content']