forked from platypush/platypush
390 lines
13 KiB
Python
390 lines
13 KiB
Python
import datetime
|
|
import os
|
|
import queue
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
from dateutil.tz import tzutc
|
|
from typing import Iterable, Optional, Collection, Set
|
|
from xml.etree import ElementTree
|
|
|
|
import dateutil.parser
|
|
import requests
|
|
|
|
from platypush.context import get_bus, get_plugin
|
|
from platypush.message.event.rss import NewFeedEntryEvent
|
|
from platypush.plugins import RunnablePlugin, action
|
|
from platypush.schemas.rss import RssFeedEntrySchema
|
|
|
|
|
|
class RssPlugin(RunnablePlugin):
|
|
"""
|
|
A plugin for parsing and subscribing to RSS feeds.
|
|
|
|
Triggers:
|
|
|
|
- :class:`platypush.message.event.rss.NewFeedEntryEvent` when a new entry is received on a subscribed feed.
|
|
|
|
Requires:
|
|
|
|
* **feedparser** (``pip install feedparser``)
|
|
* **defusedxml** (``pip install defusedxml``)
|
|
|
|
"""
|
|
|
|
user_agent = (
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
+ 'Chrome/62.0.3202.94 Safari/537.36'
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
subscriptions: Optional[Collection[str]] = None,
|
|
poll_seconds: int = 300,
|
|
user_agent: str = user_agent,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
:param subscriptions: List of feeds to monitor for updates, as URLs.
|
|
OPML URLs/local files are also supported.
|
|
:param poll_seconds: How often we should check for updates (default: 300 seconds).
|
|
:param user_agent: Custom user agent to use for the requests.
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.poll_seconds = poll_seconds
|
|
self.user_agent = user_agent
|
|
self._feeds_metadata = {}
|
|
self._feed_worker_queues = [queue.Queue()] * 5
|
|
self._feed_response_queue = queue.Queue()
|
|
self._feed_workers = []
|
|
self._latest_entries = []
|
|
|
|
self.subscriptions = list(self._parse_subscriptions(subscriptions or []))
|
|
|
|
self._latest_timestamps = self._get_latest_timestamps()
|
|
|
|
@staticmethod
|
|
def _get_feed_latest_timestamp_varname(url: str) -> str:
|
|
return f'LATEST_FEED_TIMESTAMP[{url}]'
|
|
|
|
@classmethod
|
|
def _get_feed_latest_timestamp(cls, url: str) -> Optional[datetime.datetime]:
|
|
t = (
|
|
get_plugin('variable')
|
|
.get(cls._get_feed_latest_timestamp_varname(url))
|
|
.output.get(cls._get_feed_latest_timestamp_varname(url))
|
|
)
|
|
|
|
if t:
|
|
return dateutil.parser.isoparse(t)
|
|
|
|
def _get_latest_timestamps(self) -> dict:
|
|
return {url: self._get_feed_latest_timestamp(url) for url in self.subscriptions}
|
|
|
|
def _update_latest_timestamps(self) -> None:
|
|
variable = get_plugin('variable')
|
|
variable.set(
|
|
**{
|
|
self._get_feed_latest_timestamp_varname(url): latest_timestamp
|
|
for url, latest_timestamp in self._latest_timestamps.items()
|
|
}
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_content(entry) -> Optional[str]:
|
|
content = getattr(entry, 'content', None)
|
|
if not content:
|
|
return
|
|
|
|
if isinstance(content, list):
|
|
return content[0]['value']
|
|
|
|
return content
|
|
|
|
@action
|
|
def parse_feed(self, url: str):
|
|
"""
|
|
Parse a feed URL.
|
|
|
|
:param url: Feed URL.
|
|
:return: .. schema:: rss.RssFeedEntrySchema(many=True)
|
|
"""
|
|
import feedparser
|
|
|
|
feed = feedparser.parse(
|
|
requests.get(url, headers={'User-Agent': self.user_agent}).text
|
|
)
|
|
return RssFeedEntrySchema().dump(
|
|
sorted(
|
|
[
|
|
{
|
|
'feed_url': url,
|
|
'feed_title': getattr(feed.feed, 'title', None),
|
|
'id': getattr(entry, 'id', None),
|
|
'url': entry.link,
|
|
'published': datetime.datetime.fromtimestamp(
|
|
time.mktime(entry.published_parsed)
|
|
),
|
|
'title': entry.title,
|
|
'summary': getattr(entry, 'summary', None),
|
|
'content': self._parse_content(entry),
|
|
}
|
|
for entry in feed.entries
|
|
if getattr(entry, 'published_parsed', None)
|
|
],
|
|
key=lambda e: e['published'],
|
|
),
|
|
many=True,
|
|
)
|
|
|
|
@action
|
|
def get_latest_entries(self, limit: int = 20):
|
|
"""
|
|
Get the latest entries from the subscribed feeds, sorted by descending published date.
|
|
|
|
:param limit: Maximum number of entries to return (default: 20).
|
|
:return: .. schema:: rss.RssFeedEntrySchema(many=True)
|
|
"""
|
|
return sorted(self._latest_entries, key=lambda e: e['published'], reverse=True)[
|
|
:limit
|
|
]
|
|
|
|
def _feed_worker(self, q: queue.Queue):
|
|
while not self.should_stop():
|
|
try:
|
|
url = q.get(block=True, timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
try:
|
|
self._feed_response_queue.put(
|
|
{
|
|
'url': url,
|
|
'content': self.parse_feed(url).output,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self._feed_response_queue.put(
|
|
{
|
|
'url': url,
|
|
'error': e,
|
|
}
|
|
)
|
|
|
|
self._feed_response_queue.put(None)
|
|
|
|
def _parse_opml_lists(self, subs: Iterable[str]) -> Set[str]:
|
|
from defusedxml import ElementTree
|
|
|
|
feeds = set()
|
|
subs = set(subs)
|
|
content_by_sub = {}
|
|
urls = {sub for sub in subs if re.search(r'^https?://', sub)}
|
|
files = {os.path.expanduser(sub) for sub in subs if sub not in urls}
|
|
|
|
for url in urls:
|
|
try:
|
|
content_by_sub[url] = requests.get(
|
|
url,
|
|
headers={
|
|
'User-Agent': self.user_agent,
|
|
},
|
|
).text
|
|
except Exception as e:
|
|
self.logger.warning('Could not retrieve subscription %s: %s', url, e)
|
|
|
|
for file in files:
|
|
try:
|
|
with open(file, 'r') as f:
|
|
content_by_sub[file] = f.read()
|
|
except Exception as e:
|
|
self.logger.warning('Could not open file %s: %s', file, e)
|
|
|
|
for sub, content in content_by_sub.items():
|
|
root = ElementTree.fromstring(content.strip())
|
|
if root.tag != 'opml':
|
|
self.logger.warning('%s is not a valid OPML resource', sub)
|
|
continue
|
|
|
|
feeds.update(self._parse_feeds_from_outlines(root.findall('body/outline')))
|
|
|
|
return feeds
|
|
|
|
def _parse_feeds_from_outlines(
|
|
self,
|
|
outlines: Iterable[ElementTree.Element],
|
|
) -> Set[str]:
|
|
feeds = set()
|
|
outlines = list(outlines)
|
|
|
|
while outlines:
|
|
outline = outlines.pop(0)
|
|
if 'xmlUrl' in outline.attrib:
|
|
url = outline.attrib['xmlUrl']
|
|
feeds.add(url)
|
|
self._feeds_metadata[url] = {
|
|
**self._feeds_metadata.get(url, {}),
|
|
'title': outline.attrib.get('title'),
|
|
'description': outline.attrib.get('text'),
|
|
'url': outline.attrib.get('htmlUrl'),
|
|
}
|
|
|
|
for i, child in enumerate(outline.iter()):
|
|
if i > 0:
|
|
outlines.append(child)
|
|
|
|
return feeds
|
|
|
|
def _parse_subscriptions(self, subs: Iterable[str]) -> Iterable[str]:
|
|
import feedparser
|
|
|
|
self.logger.info('Parsing feed subscriptions')
|
|
feeds = set()
|
|
lists = set()
|
|
|
|
for sub in subs:
|
|
try:
|
|
# Check if it's an OPML list of feeds or an individual feed
|
|
feed = feedparser.parse(sub)
|
|
if feed.feed.get('opml'):
|
|
lists.add(sub)
|
|
else:
|
|
channel = feed.get('channel', {})
|
|
self._feeds_metadata[sub] = {
|
|
**self._feeds_metadata.get(sub, {}),
|
|
'title': channel.get('title'),
|
|
'description': channel.get('description'),
|
|
'url': channel.get('link'),
|
|
}
|
|
|
|
feeds.add(sub)
|
|
except Exception as e:
|
|
self.logger.warning('Could not parse %s: %s', sub, e)
|
|
|
|
feeds.update(self._parse_opml_lists(lists))
|
|
return feeds
|
|
|
|
@staticmethod
|
|
def _datetime_to_string(dt: datetime.datetime) -> str:
|
|
return dt.replace(tzinfo=tzutc()).strftime('%a, %d %b %Y %H:%M:%S %Z')
|
|
|
|
@action
|
|
def export_to_opml(self) -> str:
|
|
"""
|
|
Export the list of subscriptions into OPML format.
|
|
|
|
:return: The list of subscriptions as a string in OPML format.
|
|
"""
|
|
root = ElementTree.Element('opml', {'version': '2.0'})
|
|
|
|
head = ElementTree.Element('head')
|
|
title = ElementTree.Element('title')
|
|
title.text = 'Platypush feed subscriptions'
|
|
created = ElementTree.Element('dateCreated')
|
|
created.text = self._datetime_to_string(datetime.datetime.utcnow())
|
|
head.append(title)
|
|
head.append(created)
|
|
|
|
body = ElementTree.Element('body')
|
|
feeds = ElementTree.Element('outline', {'text': 'Feeds'})
|
|
|
|
for sub in self.subscriptions:
|
|
metadata = self._feeds_metadata.get(sub, {})
|
|
feed = ElementTree.Element(
|
|
'outline',
|
|
{
|
|
'xmlUrl': sub,
|
|
'text': metadata.get('description', metadata.get('title', sub)),
|
|
**({'htmlUrl': metadata['url']} if metadata.get('url') else {}),
|
|
**({'title': metadata['title']} if metadata.get('title') else {}),
|
|
},
|
|
)
|
|
|
|
feeds.append(feed)
|
|
|
|
body.append(feeds)
|
|
|
|
root.append(head)
|
|
root.append(body)
|
|
return ElementTree.tostring(root, encoding='utf-8', method='xml').decode()
|
|
|
|
def main(self):
|
|
self._feed_workers = [
|
|
threading.Thread(target=self._feed_worker, args=(q,))
|
|
for q in self._feed_worker_queues
|
|
]
|
|
|
|
for worker in self._feed_workers:
|
|
worker.start()
|
|
|
|
self.logger.info(
|
|
f'Initialized RSS plugin with {len(self.subscriptions)} subscriptions'
|
|
)
|
|
|
|
while not self.should_stop():
|
|
responses = {}
|
|
for i, url in enumerate(self.subscriptions):
|
|
worker_queue = self._feed_worker_queues[
|
|
i % len(self._feed_worker_queues)
|
|
]
|
|
worker_queue.put(url)
|
|
|
|
time_start = time.time()
|
|
timeout = 60
|
|
max_time = time_start + timeout
|
|
new_entries = []
|
|
|
|
while (
|
|
not self.should_stop()
|
|
and len(responses) < len(self.subscriptions)
|
|
and time.time() - time_start <= timeout
|
|
):
|
|
try:
|
|
response = self._feed_response_queue.get(
|
|
block=True, timeout=max_time - time_start
|
|
)
|
|
except queue.Empty:
|
|
self.logger.warning('RSS parse timeout')
|
|
break
|
|
|
|
if not response:
|
|
continue
|
|
|
|
url = response['url']
|
|
error = response.get('error')
|
|
if error:
|
|
self.logger.error(f'Could not parse feed {url}: {error}')
|
|
responses[url] = error
|
|
else:
|
|
responses[url] = response['content']
|
|
|
|
responses = {
|
|
k: v for k, v in responses.items() if not isinstance(v, Exception)
|
|
}
|
|
|
|
for url, response in responses.items():
|
|
latest_timestamp = self._latest_timestamps.get(url)
|
|
new_entries += response
|
|
|
|
for entry in response:
|
|
published = datetime.datetime.fromisoformat(entry['published'])
|
|
if not latest_timestamp or published > latest_timestamp:
|
|
latest_timestamp = published
|
|
get_bus().post(NewFeedEntryEvent(**entry))
|
|
|
|
self._latest_timestamps[url] = latest_timestamp
|
|
|
|
self._update_latest_timestamps()
|
|
self._latest_entries = new_entries
|
|
self.wait_stop(self.poll_seconds)
|
|
|
|
def stop(self):
|
|
super().stop()
|
|
for worker in self._feed_workers:
|
|
worker.join(timeout=60)
|
|
|
|
self.logger.info('RSS integration stopped')
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|