diff --git a/platypush/entities/_base.py b/platypush/entities/_base.py index b68d8d814..c6d5c18b3 100644 --- a/platypush/entities/_base.py +++ b/platypush/entities/_base.py @@ -117,9 +117,8 @@ if 'entity' not in Base.metadata: 'polymorphic_on': type, } - @classmethod # type: ignore - @property - def columns(cls) -> Tuple[ColumnProperty, ...]: + @classmethod + def get_columns(cls) -> Tuple[ColumnProperty, ...]: inspector = schema_inspect(cls) return tuple(inspector.mapper.column_attrs) @@ -146,7 +145,7 @@ if 'entity' not in Base.metadata: return self.__class__( **dict( key_value_pair(col) # type: ignore - for col in self.columns + for col in self.get_columns() if key_value_pair(col) is not None ), children=[child.copy() for child in self.children], @@ -213,7 +212,7 @@ if 'entity' not in Base.metadata: return { **dict( self._column_to_pair(col) - for col in self.columns + for col in self.get_columns() if self._column_to_pair(col) ), 'children_ids': self.children_ids, @@ -241,7 +240,7 @@ if 'entity' not in Base.metadata: """ Serializes the new value before assigning it to an attribute. """ - matching_columns = [c for c in self.columns if c.expression.name == key] # type: ignore + matching_columns = [c for c in self.get_columns() if c.expression.name == key] # type: ignore if ( matching_columns diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index 71b344c21..a70492dd7 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -176,7 +176,7 @@ class EntitiesMerger: """ Merge two versions of an entity column by column. """ - columns = [col.key for col in entity.columns] + columns = [col.key for col in entity.get_columns()] for col in columns: if col == 'meta': existing_entity.meta = { # type: ignore @@ -189,7 +189,9 @@ class EntitiesMerger: except ObjectDeletedError as e: logger.warning( 'Could not set %s on entity <%s>: %s', - col, existing_entity.entity_key, e + col, + existing_entity.entity_key, + e, ) # Recursive call to merge the columns of the children too diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index ae274af13..1ef908c7c 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -4,20 +4,26 @@ import queue import re import threading import time - -from dateutil.tz import tzutc from typing import Iterable, Optional, Collection, Set from xml.etree import ElementTree import dateutil.parser +from dateutil.tz import tzutc import requests from platypush.context import get_bus, get_plugin from platypush.message.event.rss import NewFeedEntryEvent from platypush.plugins import RunnablePlugin, action +from platypush.plugins.variable import VariablePlugin from platypush.schemas.rss import RssFeedEntrySchema +def _variable() -> VariablePlugin: + var = get_plugin(VariablePlugin) + assert var, 'Could not load the variable plugin' + return var + + class RssPlugin(RunnablePlugin): """ A plugin for parsing and subscribing to RSS feeds. @@ -38,6 +44,8 @@ class RssPlugin(RunnablePlugin): + 'Chrome/62.0.3202.94 Safari/537.36' ) + timeout = 20 + def __init__( self, subscriptions: Optional[Collection[str]] = None, @@ -61,8 +69,7 @@ class RssPlugin(RunnablePlugin): self._latest_entries = [] self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) - - self._latest_timestamps = self._get_latest_timestamps() + self._latest_timestamps = {} @staticmethod def _get_feed_latest_timestamp_varname(url: str) -> str: @@ -70,21 +77,20 @@ class RssPlugin(RunnablePlugin): @classmethod def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: - t = ( - get_plugin('variable') - .get(cls._get_feed_latest_timestamp_varname(url)) - .output.get(cls._get_feed_latest_timestamp_varname(url)) - ) + varname = cls._get_feed_latest_timestamp_varname(url) + var: dict = _variable().get(varname).output or {} # type: ignore + t = var.get(varname) if t: return dateutil.parser.isoparse(t) + return None + def _get_latest_timestamps(self) -> dict: return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} def _update_latest_timestamps(self) -> None: - variable = get_plugin('variable') - variable.set( + _variable().set( **{ self._get_feed_latest_timestamp_varname(url): latest_timestamp for url, latest_timestamp in self._latest_timestamps.items() @@ -95,7 +101,7 @@ class RssPlugin(RunnablePlugin): def _parse_content(entry) -> Optional[str]: content = getattr(entry, 'content', None) if not content: - return + return None if isinstance(content, list): return content[0]['value'] @@ -113,7 +119,9 @@ class RssPlugin(RunnablePlugin): import feedparser feed = feedparser.parse( - requests.get(url, headers={'User-Agent': self.user_agent}).text + requests.get( + url, headers={'User-Agent': self.user_agent}, timeout=self.timeout + ).text, ) return RssFeedEntrySchema().dump( sorted( @@ -139,7 +147,7 @@ class RssPlugin(RunnablePlugin): for entry in feed.entries if getattr(entry, 'published_parsed', None) ], - key=lambda e: e['published'], + key=lambda e: e['published'], # type: ignore ), many=True, ) @@ -196,6 +204,7 @@ class RssPlugin(RunnablePlugin): headers={ 'User-Agent': self.user_agent, }, + timeout=self.timeout, ).text except Exception as e: self.logger.warning('Could not retrieve subscription %s: %s', url, e) @@ -316,6 +325,7 @@ class RssPlugin(RunnablePlugin): return ElementTree.tostring(root, encoding='utf-8', method='xml').decode() def main(self): + self._latest_timestamps = self._get_latest_timestamps() self._feed_workers = [ threading.Thread(target=self._feed_worker, args=(q,)) for q in self._feed_worker_queues @@ -325,7 +335,7 @@ class RssPlugin(RunnablePlugin): worker.start() self.logger.info( - f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions' + 'Initialized RSS plugin with %d subscriptions', len(self.subscriptions) ) while not self.should_stop(): @@ -360,7 +370,7 @@ class RssPlugin(RunnablePlugin): url = response['url'] error = response.get('error') if error: - self.logger.error(f'Could not parse feed {url}: {error}') + self.logger.error('Could not parse feed %s: %s', url, error) responses[url] = error else: responses[url] = response['content'] diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py index ac713c9d2..ce2a443ab 100644 --- a/platypush/runner/_runner.py +++ b/platypush/runner/_runner.py @@ -49,12 +49,9 @@ class ApplicationRunner: self._print_version() while True: - with ( - CommandStream(parsed_args.ctrl_sock) as stream, - ApplicationProcess( - *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout - ) as self._proc, - ): + with CommandStream(parsed_args.ctrl_sock) as stream, ApplicationProcess( + *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout + ) as self._proc: try: self._listen(stream) except KeyboardInterrupt: