- Importing Flask.request as http_request to prevent name clashes with
SQLAlchemy's request module - All SQLAlchemy engine and connection setup done within get_new_items to prevent different threads from creating and using the db instance - Added check_same_thread to sqlite connection line to prevent different-thread exceptions when the Flask main thread does some cleanup
This commit is contained in:
parent
e9425236a9
commit
68c52fe102
2 changed files with 16 additions and 9 deletions
|
@ -7,7 +7,7 @@ import time
|
||||||
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
from flask import Flask, abort, jsonify, request, render_template, send_from_directory
|
from flask import Flask, abort, jsonify, request as http_request, render_template, send_from_directory
|
||||||
|
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
from platypush.message import Message
|
from platypush.message import Message
|
||||||
|
@ -80,8 +80,8 @@ class HttpBackend(Backend):
|
||||||
|
|
||||||
@app.route('/execute', methods=['POST'])
|
@app.route('/execute', methods=['POST'])
|
||||||
def execute():
|
def execute():
|
||||||
args = json.loads(request.data.decode('utf-8'))
|
args = json.loads(http_request.data.decode('utf-8'))
|
||||||
token = request.headers['X-Token'] if 'X-Token' in request.headers else None
|
token = http_request.headers['X-Token'] if 'X-Token' in http_request.headers else None
|
||||||
if token != self.token: abort(401)
|
if token != self.token: abort(401)
|
||||||
|
|
||||||
msg = Message.build(args)
|
msg = Message.build(args)
|
||||||
|
|
|
@ -37,11 +37,6 @@ class RssUpdates(HttpRequest):
|
||||||
|
|
||||||
os.makedirs(os.path.expanduser(os.path.dirname(self.dbfile)), exist_ok=True)
|
os.makedirs(os.path.expanduser(os.path.dirname(self.dbfile)), exist_ok=True)
|
||||||
|
|
||||||
self.engine = create_engine('sqlite:///{}'.format(self.dbfile))
|
|
||||||
Base.metadata.create_all(self.engine)
|
|
||||||
Session.configure(bind=self.engine)
|
|
||||||
self._get_or_create_source(session=Session())
|
|
||||||
|
|
||||||
if headers is None: headers = {}
|
if headers is None: headers = {}
|
||||||
headers['User-Agent'] = self.user_agent
|
headers['User-Agent'] = self.user_agent
|
||||||
|
|
||||||
|
@ -54,7 +49,6 @@ class RssUpdates(HttpRequest):
|
||||||
|
|
||||||
super().__init__(*args, skip_first_call=False, args=request_args, **kwargs)
|
super().__init__(*args, skip_first_call=False, args=request_args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def _get_or_create_source(self, session):
|
def _get_or_create_source(self, session):
|
||||||
record = session.query(FeedSource).filter_by(url=self.url).first()
|
record = session.query(FeedSource).filter_by(url=self.url).first()
|
||||||
if record is None:
|
if record is None:
|
||||||
|
@ -79,6 +73,13 @@ class RssUpdates(HttpRequest):
|
||||||
|
|
||||||
|
|
||||||
def get_new_items(self, response):
|
def get_new_items(self, response):
|
||||||
|
engine = create_engine('sqlite:///{}'.format(self.dbfile),
|
||||||
|
connect_args = { 'check_same_thread': False })
|
||||||
|
|
||||||
|
Base.metadata.create_all(engine)
|
||||||
|
Session.configure(bind=engine)
|
||||||
|
self._get_or_create_source(session=Session())
|
||||||
|
|
||||||
feed = feedparser.parse(response.text)
|
feed = feedparser.parse(response.text)
|
||||||
session = Session()
|
session = Session()
|
||||||
source_record = self._get_or_create_source(session=session)
|
source_record = self._get_or_create_source(session=session)
|
||||||
|
@ -97,11 +98,17 @@ class RssUpdates(HttpRequest):
|
||||||
datetime.datetime.now().strftime('%d %B %Y, %H:%M')
|
datetime.datetime.now().strftime('%d %B %Y, %H:%M')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logging.info('Parsed {:d} items from RSS feed <{}>'
|
||||||
|
.format(len(feed.entries), self.url))
|
||||||
|
|
||||||
for entry in feed.entries:
|
for entry in feed.entries:
|
||||||
entry_timestamp = datetime.datetime(*entry.published_parsed[:6])
|
entry_timestamp = datetime.datetime(*entry.published_parsed[:6])
|
||||||
|
|
||||||
if source_record.last_updated_at is None \
|
if source_record.last_updated_at is None \
|
||||||
or entry_timestamp > source_record.last_updated_at:
|
or entry_timestamp > source_record.last_updated_at:
|
||||||
|
logging.info('Processed new item from RSS feed <{}>: "{}"'
|
||||||
|
.format(self.url, entry.title))
|
||||||
|
|
||||||
if self.mercury_api_key:
|
if self.mercury_api_key:
|
||||||
entry.content = self._parse_entry_content(entry.link)
|
entry.content = self._parse_entry_content(entry.link)
|
||||||
elif hasattr(entry, 'summary'):
|
elif hasattr(entry, 'summary'):
|
||||||
|
|
Loading…
Reference in a new issue