Merge branch 'master' into 276/better-docker

This commit is contained in:
Fabio Manganiello 2023-08-18 17:34:56 +02:00
commit 0a5fc40dc5
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 38 additions and 30 deletions

View file

@ -117,9 +117,8 @@ if 'entity' not in Base.metadata:
'polymorphic_on': type, 'polymorphic_on': type,
} }
@classmethod # type: ignore @classmethod
@property def get_columns(cls) -> Tuple[ColumnProperty, ...]:
def columns(cls) -> Tuple[ColumnProperty, ...]:
inspector = schema_inspect(cls) inspector = schema_inspect(cls)
return tuple(inspector.mapper.column_attrs) return tuple(inspector.mapper.column_attrs)
@ -146,7 +145,7 @@ if 'entity' not in Base.metadata:
return self.__class__( return self.__class__(
**dict( **dict(
key_value_pair(col) # type: ignore key_value_pair(col) # type: ignore
for col in self.columns for col in self.get_columns()
if key_value_pair(col) is not None if key_value_pair(col) is not None
), ),
children=[child.copy() for child in self.children], children=[child.copy() for child in self.children],
@ -213,7 +212,7 @@ if 'entity' not in Base.metadata:
return { return {
**dict( **dict(
self._column_to_pair(col) self._column_to_pair(col)
for col in self.columns for col in self.get_columns()
if self._column_to_pair(col) if self._column_to_pair(col)
), ),
'children_ids': self.children_ids, 'children_ids': self.children_ids,
@ -241,7 +240,7 @@ if 'entity' not in Base.metadata:
""" """
Serializes the new value before assigning it to an attribute. Serializes the new value before assigning it to an attribute.
""" """
matching_columns = [c for c in self.columns if c.expression.name == key] # type: ignore matching_columns = [c for c in self.get_columns() if c.expression.name == key] # type: ignore
if ( if (
matching_columns matching_columns

View file

@ -176,7 +176,7 @@ class EntitiesMerger:
""" """
Merge two versions of an entity column by column. Merge two versions of an entity column by column.
""" """
columns = [col.key for col in entity.columns] columns = [col.key for col in entity.get_columns()]
for col in columns: for col in columns:
if col == 'meta': if col == 'meta':
existing_entity.meta = { # type: ignore existing_entity.meta = { # type: ignore
@ -189,7 +189,9 @@ class EntitiesMerger:
except ObjectDeletedError as e: except ObjectDeletedError as e:
logger.warning( logger.warning(
'Could not set %s on entity <%s>: %s', 'Could not set %s on entity <%s>: %s',
col, existing_entity.entity_key, e col,
existing_entity.entity_key,
e,
) )
# Recursive call to merge the columns of the children too # Recursive call to merge the columns of the children too

View file

@ -4,20 +4,26 @@ import queue
import re import re
import threading import threading
import time import time
from dateutil.tz import tzutc
from typing import Iterable, Optional, Collection, Set from typing import Iterable, Optional, Collection, Set
from xml.etree import ElementTree from xml.etree import ElementTree
import dateutil.parser import dateutil.parser
from dateutil.tz import tzutc
import requests import requests
from platypush.context import get_bus, get_plugin from platypush.context import get_bus, get_plugin
from platypush.message.event.rss import NewFeedEntryEvent from platypush.message.event.rss import NewFeedEntryEvent
from platypush.plugins import RunnablePlugin, action from platypush.plugins import RunnablePlugin, action
from platypush.plugins.variable import VariablePlugin
from platypush.schemas.rss import RssFeedEntrySchema from platypush.schemas.rss import RssFeedEntrySchema
def _variable() -> VariablePlugin:
var = get_plugin(VariablePlugin)
assert var, 'Could not load the variable plugin'
return var
class RssPlugin(RunnablePlugin): class RssPlugin(RunnablePlugin):
""" """
A plugin for parsing and subscribing to RSS feeds. A plugin for parsing and subscribing to RSS feeds.
@ -38,6 +44,8 @@ class RssPlugin(RunnablePlugin):
+ 'Chrome/62.0.3202.94 Safari/537.36' + 'Chrome/62.0.3202.94 Safari/537.36'
) )
timeout = 20
def __init__( def __init__(
self, self,
subscriptions: Optional[Collection[str]] = None, subscriptions: Optional[Collection[str]] = None,
@ -61,8 +69,7 @@ class RssPlugin(RunnablePlugin):
self._latest_entries = [] self._latest_entries = []
self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) self.subscriptions = list(self._parse_subscriptions(subscriptions or []))
self._latest_timestamps = {}
self._latest_timestamps = self._get_latest_timestamps()
@staticmethod @staticmethod
def _get_feed_latest_timestamp_varname(url: str) -> str: def _get_feed_latest_timestamp_varname(url: str) -> str:
@ -70,21 +77,20 @@ class RssPlugin(RunnablePlugin):
@classmethod @classmethod
def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]:
t = ( varname = cls._get_feed_latest_timestamp_varname(url)
get_plugin('variable') var: dict = _variable().get(varname).output or {} # type: ignore
.get(cls._get_feed_latest_timestamp_varname(url)) t = var.get(varname)
.output.get(cls._get_feed_latest_timestamp_varname(url))
)
if t: if t:
return dateutil.parser.isoparse(t) return dateutil.parser.isoparse(t)
return None
def _get_latest_timestamps(self) -> dict: def _get_latest_timestamps(self) -> dict:
return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions}
def _update_latest_timestamps(self) -> None: def _update_latest_timestamps(self) -> None:
variable = get_plugin('variable') _variable().set(
variable.set(
**{ **{
self._get_feed_latest_timestamp_varname(url): latest_timestamp self._get_feed_latest_timestamp_varname(url): latest_timestamp
for url, latest_timestamp in self._latest_timestamps.items() for url, latest_timestamp in self._latest_timestamps.items()
@ -95,7 +101,7 @@ class RssPlugin(RunnablePlugin):
def _parse_content(entry) -> Optional[str]: def _parse_content(entry) -> Optional[str]:
content = getattr(entry, 'content', None) content = getattr(entry, 'content', None)
if not content: if not content:
return return None
if isinstance(content, list): if isinstance(content, list):
return content[0]['value'] return content[0]['value']
@ -113,7 +119,9 @@ class RssPlugin(RunnablePlugin):
import feedparser import feedparser
feed = feedparser.parse( feed = feedparser.parse(
requests.get(url, headers={'User-Agent': self.user_agent}).text requests.get(
url, headers={'User-Agent': self.user_agent}, timeout=self.timeout
).text,
) )
return RssFeedEntrySchema().dump( return RssFeedEntrySchema().dump(
sorted( sorted(
@ -139,7 +147,7 @@ class RssPlugin(RunnablePlugin):
for entry in feed.entries for entry in feed.entries
if getattr(entry, 'published_parsed', None) if getattr(entry, 'published_parsed', None)
], ],
key=lambda e: e['published'], key=lambda e: e['published'], # type: ignore
), ),
many=True, many=True,
) )
@ -196,6 +204,7 @@ class RssPlugin(RunnablePlugin):
headers={ headers={
'User-Agent': self.user_agent, 'User-Agent': self.user_agent,
}, },
timeout=self.timeout,
).text ).text
except Exception as e: except Exception as e:
self.logger.warning('Could not retrieve subscription %s: %s', url, e) self.logger.warning('Could not retrieve subscription %s: %s', url, e)
@ -316,6 +325,7 @@ class RssPlugin(RunnablePlugin):
return ElementTree.tostring(root, encoding='utf-8', method='xml').decode() return ElementTree.tostring(root, encoding='utf-8', method='xml').decode()
def main(self): def main(self):
self._latest_timestamps = self._get_latest_timestamps()
self._feed_workers = [ self._feed_workers = [
threading.Thread(target=self._feed_worker, args=(q,)) threading.Thread(target=self._feed_worker, args=(q,))
for q in self._feed_worker_queues for q in self._feed_worker_queues
@ -325,7 +335,7 @@ class RssPlugin(RunnablePlugin):
worker.start() worker.start()
self.logger.info( self.logger.info(
f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions' 'Initialized RSS plugin with %d subscriptions', len(self.subscriptions)
) )
while not self.should_stop(): while not self.should_stop():
@ -360,7 +370,7 @@ class RssPlugin(RunnablePlugin):
url = response['url'] url = response['url']
error = response.get('error') error = response.get('error')
if error: if error:
self.logger.error(f'Could not parse feed {url}: {error}') self.logger.error('Could not parse feed %s: %s', url, error)
responses[url] = error responses[url] = error
else: else:
responses[url] = response['content'] responses[url] = response['content']

View file

@ -49,12 +49,9 @@ class ApplicationRunner:
self._print_version() self._print_version()
while True: while True:
with ( with CommandStream(parsed_args.ctrl_sock) as stream, ApplicationProcess(
CommandStream(parsed_args.ctrl_sock) as stream, *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout
ApplicationProcess( ) as self._proc:
*args, pidfile=parsed_args.pidfile, timeout=self._default_timeout
) as self._proc,
):
try: try:
self._listen(stream) self._listen(stream)
except KeyboardInterrupt: except KeyboardInterrupt: