Merge pull request 'Add support for OPML import and export in the RSS plugin' (#220) from 219-opml-import-export into master

Reviewed-on: platypush/platypush#220
This commit is contained in:
Fabio Manganiello 2022-09-02 00:24:37 +02:00
commit e77d6a4ad4
3 changed files with 225 additions and 52 deletions

View file

@ -1,8 +1,13 @@
import datetime
import os
import queue
import re
import threading
import time
from typing import Optional, Collection
from dateutil.tz import tzutc
from typing import Iterable, Optional, Collection, Set
from xml.etree import ElementTree
import dateutil.parser
import requests
@ -24,56 +29,67 @@ class RssPlugin(RunnablePlugin):
Requires:
* **feedparser** (``pip install feedparser``)
* **defusedxml** (``pip install defusedxml``)
"""
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' + \
'Chrome/62.0.3202.94 Safari/537.36'
user_agent = (
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) '
+ 'Chrome/62.0.3202.94 Safari/537.36'
)
def __init__(
self, subscriptions: Optional[Collection[str]] = None, poll_seconds: int = 300,
user_agent: str = user_agent, **kwargs
self,
subscriptions: Optional[Collection[str]] = None,
poll_seconds: int = 300,
user_agent: str = user_agent,
**kwargs,
):
"""
:param subscriptions: List of feeds to monitor for updates, as URLs.
OPML URLs/local files are also supported.
:param poll_seconds: How often we should check for updates (default: 300 seconds).
:param user_agent: Custom user agent to use for the requests.
"""
super().__init__(**kwargs)
self.subscriptions = subscriptions or []
self.poll_seconds = poll_seconds
self.user_agent = user_agent
self._latest_timestamps = self._get_latest_timestamps()
self._feeds_metadata = {}
self._feed_worker_queues = [queue.Queue()] * 5
self._feed_response_queue = queue.Queue()
self._feed_workers = []
self._latest_entries = []
self.subscriptions = list(self._parse_subscriptions(subscriptions or []))
self._latest_timestamps = self._get_latest_timestamps()
@staticmethod
def _get_feed_latest_timestamp_varname(url: str) -> str:
return f'LATEST_FEED_TIMESTAMP[{url}]'
@classmethod
def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]:
t = get_plugin('variable').get(
cls._get_feed_latest_timestamp_varname(url)
).output.get(cls._get_feed_latest_timestamp_varname(url))
t = (
get_plugin('variable')
.get(cls._get_feed_latest_timestamp_varname(url))
.output.get(cls._get_feed_latest_timestamp_varname(url))
)
if t:
return dateutil.parser.isoparse(t)
def _get_latest_timestamps(self) -> dict:
return {
url: self._get_feed_latest_timestamp(url)
for url in self.subscriptions
}
return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions}
def _update_latest_timestamps(self) -> None:
variable = get_plugin('variable')
variable.set(**{
variable.set(
**{
self._get_feed_latest_timestamp_varname(url): latest_timestamp
for url, latest_timestamp in self._latest_timestamps.items()
})
}
)
@staticmethod
def _parse_content(entry) -> Optional[str]:
@ -96,23 +112,30 @@ class RssPlugin(RunnablePlugin):
"""
import feedparser
feed = feedparser.parse(requests.get(url, headers={'User-Agent': self.user_agent}).text)
feed = feedparser.parse(
requests.get(url, headers={'User-Agent': self.user_agent}).text
)
return RssFeedEntrySchema().dump(
sorted([
sorted(
[
{
'feed_url': url,
'feed_title': getattr(feed.feed, 'title', None),
'id': getattr(entry, 'id', None),
'url': entry.link,
'published': datetime.datetime.fromtimestamp(time.mktime(entry.published_parsed)),
'published': datetime.datetime.fromtimestamp(
time.mktime(entry.published_parsed)
),
'title': entry.title,
'summary': getattr(entry, 'summary', None),
'content': self._parse_content(entry),
}
for entry in feed.entries
if getattr(entry, 'published_parsed', None)
], key=lambda e: e['published']),
many=True
],
key=lambda e: e['published'],
),
many=True,
)
@action
@ -123,7 +146,9 @@ class RssPlugin(RunnablePlugin):
:param limit: Maximum number of entries to return (default: 20).
:return: .. schema:: rss.RssFeedEntrySchema(many=True)
"""
return sorted(self._latest_entries, key=lambda e: e['published'], reverse=True)[:limit]
return sorted(self._latest_entries, key=lambda e: e['published'], reverse=True)[
:limit
]
def _feed_worker(self, q: queue.Queue):
while not self.should_stop():
@ -133,18 +158,157 @@ class RssPlugin(RunnablePlugin):
continue
try:
self._feed_response_queue.put({
self._feed_response_queue.put(
{
'url': url,
'content': self.parse_feed(url).output,
})
}
)
except Exception as e:
self._feed_response_queue.put({
self._feed_response_queue.put(
{
'url': url,
'error': e,
})
}
)
self._feed_response_queue.put(None)
def _parse_opml_lists(self, subs: Iterable[str]) -> Set[str]:
from defusedxml import ElementTree
feeds = set()
subs = set(subs)
content_by_sub = {}
urls = {sub for sub in subs if re.search(r'^https?://', sub)}
files = {os.path.expanduser(sub) for sub in subs if sub not in urls}
for url in urls:
try:
content_by_sub[url] = requests.get(
url,
headers={
'User-Agent': self.user_agent,
},
).text
except Exception as e:
self.logger.warning('Could not retrieve subscription %s: %s', url, e)
for file in files:
try:
with open(file, 'r') as f:
content_by_sub[file] = f.read()
except Exception as e:
self.logger.warning('Could not open file %s: %s', file, e)
for sub, content in content_by_sub.items():
root = ElementTree.fromstring(content.strip())
if root.tag != 'opml':
self.logger.warning('%s is not a valid OPML resource', sub)
continue
feeds.update(self._parse_feeds_from_outlines(root.findall('body/outline')))
return feeds
def _parse_feeds_from_outlines(
self,
outlines: Iterable[ElementTree.Element],
) -> Set[str]:
feeds = set()
outlines = list(outlines)
while outlines:
outline = outlines.pop(0)
if 'xmlUrl' in outline.attrib:
url = outline.attrib['xmlUrl']
feeds.add(url)
self._feeds_metadata[url] = {
**self._feeds_metadata.get(url, {}),
'title': outline.attrib.get('title'),
'description': outline.attrib.get('text'),
'url': outline.attrib.get('htmlUrl'),
}
for i, child in enumerate(outline.iter()):
if i > 0:
outlines.append(child)
return feeds
def _parse_subscriptions(self, subs: Iterable[str]) -> Iterable[str]:
import feedparser
self.logger.info('Parsing feed subscriptions')
feeds = set()
lists = set()
for sub in subs:
try:
# Check if it's an OPML list of feeds or an individual feed
feed = feedparser.parse(sub)
if feed.feed.get('opml'):
lists.add(sub)
else:
channel = feed.get('channel', {})
self._feeds_metadata[sub] = {
**self._feeds_metadata.get(sub, {}),
'title': channel.get('title'),
'description': channel.get('description'),
'url': channel.get('link'),
}
feeds.add(sub)
except Exception as e:
self.logger.warning('Could not parse %s: %s', sub, e)
feeds.update(self._parse_opml_lists(lists))
return feeds
@staticmethod
def _datetime_to_string(dt: datetime.datetime) -> str:
return dt.replace(tzinfo=tzutc()).strftime('%a, %d %b %Y %H:%M:%S %Z')
@action
def export_to_opml(self) -> str:
"""
Export the list of subscriptions into OPML format.
:return: The list of subscriptions as a string in OPML format.
"""
root = ElementTree.Element('opml', {'version': '2.0'})
head = ElementTree.Element('head')
title = ElementTree.Element('title')
title.text = 'Platypush feed subscriptions'
created = ElementTree.Element('dateCreated')
created.text = self._datetime_to_string(datetime.datetime.utcnow())
head.append(title)
head.append(created)
body = ElementTree.Element('body')
feeds = ElementTree.Element('outline', {'text': 'Feeds'})
for sub in self.subscriptions:
metadata = self._feeds_metadata.get(sub, {})
feed = ElementTree.Element(
'outline',
{
'xmlUrl': sub,
'text': metadata.get('description', metadata.get('title', sub)),
**({'htmlUrl': metadata['url']} if metadata.get('url') else {}),
**({'title': metadata['title']} if metadata.get('title') else {}),
},
)
feeds.append(feed)
body.append(feeds)
root.append(head)
root.append(body)
return ElementTree.tostring(root, encoding='utf-8', method='xml').decode()
def main(self):
self._feed_workers = [
threading.Thread(target=self._feed_worker, args=(q,))
@ -154,12 +318,16 @@ class RssPlugin(RunnablePlugin):
for worker in self._feed_workers:
worker.start()
self.logger.info(f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions')
self.logger.info(
f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions'
)
while not self.should_stop():
responses = {}
for i, url in enumerate(self.subscriptions):
worker_queue = self._feed_worker_queues[i % len(self._feed_worker_queues)]
worker_queue = self._feed_worker_queues[
i % len(self._feed_worker_queues)
]
worker_queue.put(url)
time_start = time.time()
@ -168,12 +336,14 @@ class RssPlugin(RunnablePlugin):
new_entries = []
while (
not self.should_stop() and
len(responses) < len(self.subscriptions) and
time.time() - time_start <= timeout
not self.should_stop()
and len(responses) < len(self.subscriptions)
and time.time() - time_start <= timeout
):
try:
response = self._feed_response_queue.get(block=True, timeout=max_time-time_start)
response = self._feed_response_queue.get(
block=True, timeout=max_time - time_start
)
except queue.Empty:
self.logger.warning('RSS parse timeout')
break
@ -189,7 +359,9 @@ class RssPlugin(RunnablePlugin):
else:
responses[url] = response['content']
responses = {k: v for k, v in responses.items() if not isinstance(v, Exception)}
responses = {
k: v for k, v in responses.items() if not isinstance(v, Exception)
}
for url, response in responses.items():
latest_timestamp = self._latest_timestamps.get(url)
@ -205,7 +377,7 @@ class RssPlugin(RunnablePlugin):
self._update_latest_timestamps()
self._latest_entries = new_entries
time.sleep(self.poll_seconds)
self.wait_stop(self.poll_seconds)
def stop(self):
super().stop()

View file

@ -4,5 +4,6 @@ manifest:
install:
pip:
- feedparser
- defusedxml
package: platypush.plugins.rss
type: plugin

View file

@ -86,7 +86,7 @@ setup(
# Support for MQTT backends
'mqtt': ['paho-mqtt'],
# Support for RSS feeds parser
'rss': ['feedparser'],
'rss': ['feedparser', 'defusedxml'],
# Support for PDF generation
'pdf': ['weasyprint'],
# Support for Philips Hue plugin