Sync the latest parse timestamps in main instead of __init__ in rss.

We should load the latest timestamps from the db when the thread starts
instead of doing it in the constructor.

The constructor may be invoked when the entities engine hasn't been
initialized yet, and result in deadlocks.
This commit is contained in:
Fabio Manganiello 2023-08-18 15:51:11 +02:00
parent bf7d060b81
commit 5dd7345c0b
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 18 additions and 12 deletions

View File

@ -4,17 +4,17 @@ import queue
import re
import threading
import time
from dateutil.tz import tzutc
from typing import Iterable, Optional, Collection, Set
from xml.etree import ElementTree
import dateutil.parser
from dateutil.tz import tzutc
import requests
from platypush.context import get_bus, get_plugin
from platypush.message.event.rss import NewFeedEntryEvent
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.variable import VariablePlugin
from platypush.schemas.rss import RssFeedEntrySchema
@ -61,8 +61,14 @@ class RssPlugin(RunnablePlugin):
self._latest_entries = []
self.subscriptions = list(self._parse_subscriptions(subscriptions or []))
self._latest_timestamps = {}
self._latest_timestamps = self._get_latest_timestamps()
@classmethod
@property
def _variable(cls) -> VariablePlugin:
var = get_plugin(VariablePlugin)
assert var, 'Could not load the variable plugin'
return var
@staticmethod
def _get_feed_latest_timestamp_varname(url: str) -> str:
@ -70,21 +76,20 @@ class RssPlugin(RunnablePlugin):
@classmethod
def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]:
t = (
get_plugin('variable')
.get(cls._get_feed_latest_timestamp_varname(url))
.output.get(cls._get_feed_latest_timestamp_varname(url))
)
varname = cls._get_feed_latest_timestamp_varname(url)
var: dict = cls._variable.get(varname).output or {} # type: ignore
t = var.get(varname)
if t:
return dateutil.parser.isoparse(t)
return None
def _get_latest_timestamps(self) -> dict:
return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions}
def _update_latest_timestamps(self) -> None:
variable = get_plugin('variable')
variable.set(
self._variable.set(
**{
self._get_feed_latest_timestamp_varname(url): latest_timestamp
for url, latest_timestamp in self._latest_timestamps.items()
@ -95,7 +100,7 @@ class RssPlugin(RunnablePlugin):
def _parse_content(entry) -> Optional[str]:
content = getattr(entry, 'content', None)
if not content:
return
return None
if isinstance(content, list):
return content[0]['value']
@ -139,7 +144,7 @@ class RssPlugin(RunnablePlugin):
for entry in feed.entries
if getattr(entry, 'published_parsed', None)
],
key=lambda e: e['published'],
key=lambda e: e['published'], # type: ignore
),
many=True,
)
@ -316,6 +321,7 @@ class RssPlugin(RunnablePlugin):
return ElementTree.tostring(root, encoding='utf-8', method='xml').decode()
def main(self):
self._latest_timestamps = self._get_latest_timestamps()
self._feed_workers = [
threading.Thread(target=self._feed_worker, args=(q,))
for q in self._feed_worker_queues