From 5dd7345c0b2bf63fd79ff65b25147d7e97db23c0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 18 Aug 2023 15:51:11 +0200 Subject: [PATCH] Sync the latest parse timestamps in main instead of __init__ in rss. We should load the latest timestamps from the db when the thread starts instead of doing it in the constructor. The constructor may be invoked when the entities engine hasn't been initialized yet, and result in deadlocks. --- platypush/plugins/rss/__init__.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/platypush/plugins/rss/__init__.py b/platypush/plugins/rss/__init__.py index ae274af13d..f27d704cf8 100644 --- a/platypush/plugins/rss/__init__.py +++ b/platypush/plugins/rss/__init__.py @@ -4,17 +4,17 @@ import queue import re import threading import time - -from dateutil.tz import tzutc from typing import Iterable, Optional, Collection, Set from xml.etree import ElementTree import dateutil.parser +from dateutil.tz import tzutc import requests from platypush.context import get_bus, get_plugin from platypush.message.event.rss import NewFeedEntryEvent from platypush.plugins import RunnablePlugin, action +from platypush.plugins.variable import VariablePlugin from platypush.schemas.rss import RssFeedEntrySchema @@ -61,8 +61,14 @@ class RssPlugin(RunnablePlugin): self._latest_entries = [] self.subscriptions = list(self._parse_subscriptions(subscriptions or [])) + self._latest_timestamps = {} - self._latest_timestamps = self._get_latest_timestamps() + @classmethod + @property + def _variable(cls) -> VariablePlugin: + var = get_plugin(VariablePlugin) + assert var, 'Could not load the variable plugin' + return var @staticmethod def _get_feed_latest_timestamp_varname(url: str) -> str: @@ -70,21 +76,20 @@ class RssPlugin(RunnablePlugin): @classmethod def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]: - t = ( - get_plugin('variable') - .get(cls._get_feed_latest_timestamp_varname(url)) - .output.get(cls._get_feed_latest_timestamp_varname(url)) - ) + varname = cls._get_feed_latest_timestamp_varname(url) + var: dict = cls._variable.get(varname).output or {} # type: ignore + t = var.get(varname) if t: return dateutil.parser.isoparse(t) + return None + def _get_latest_timestamps(self) -> dict: return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions} def _update_latest_timestamps(self) -> None: - variable = get_plugin('variable') - variable.set( + self._variable.set( **{ self._get_feed_latest_timestamp_varname(url): latest_timestamp for url, latest_timestamp in self._latest_timestamps.items() @@ -95,7 +100,7 @@ class RssPlugin(RunnablePlugin): def _parse_content(entry) -> Optional[str]: content = getattr(entry, 'content', None) if not content: - return + return None if isinstance(content, list): return content[0]['value'] @@ -139,7 +144,7 @@ class RssPlugin(RunnablePlugin): for entry in feed.entries if getattr(entry, 'published_parsed', None) ], - key=lambda e: e['published'], + key=lambda e: e['published'], # type: ignore ), many=True, ) @@ -316,6 +321,7 @@ class RssPlugin(RunnablePlugin): return ElementTree.tostring(root, encoding='utf-8', method='xml').decode() def main(self): + self._latest_timestamps = self._get_latest_timestamps() self._feed_workers = [ threading.Thread(target=self._feed_worker, args=(q,)) for q in self._feed_worker_queues