Refactoring of media.search in a more scalable way. Also faster search

with workers in parallel by media type
This commit is contained in:
Fabio Manganiello 2019-02-18 00:26:46 +01:00
parent c7f0783615
commit 9a565061c4
5 changed files with 155 additions and 71 deletions

View file

@ -1,5 +1,6 @@
import enum import enum
import os import os
import queue
import re import re
import subprocess import subprocess
import threading import threading
@ -63,6 +64,9 @@ class MediaPlugin(Plugin):
_supported_media_plugins = {'media.mplayer', 'media.omxplayer', _supported_media_plugins = {'media.mplayer', 'media.omxplayer',
'media.chromecast'} 'media.chromecast'}
_supported_media_types = ['file', 'torrent', 'youtube']
_default_search_timeout = 60 # 60 seconds
def __init__(self, media_dirs=[], download_dir=None, env=None, def __init__(self, media_dirs=[], download_dir=None, env=None,
*args, **kwargs): *args, **kwargs):
""" """
@ -246,7 +250,8 @@ class MediaPlugin(Plugin):
raise self._NOT_IMPLEMENTED_ERR raise self._NOT_IMPLEMENTED_ERR
@action @action
def search(self, query, types=None, queue_results=False, autoplay=False): def search(self, query, types=None, queue_results=False, autoplay=False,
search_timeout=_default_search_timeout):
""" """
Perform a video search. Perform a video search.
@ -261,24 +266,40 @@ class MediaPlugin(Plugin):
:param autoplay: Play the first result of the search (default: False) :param autoplay: Play the first result of the search (default: False)
:type autoplay: bool :type autoplay: bool
:param search_timeout: Search timeout (default: 60 seconds)
:type search_timeout: float
""" """
results = [] results = {}
results_queues = {}
worker_threads = {}
if types is None: if types is None:
types = { 'youtube', 'file', 'torrent' } types = self._supported_media_types
if 'file' in types: for media_type in types:
file_results = self.file_search(query).output results[media_type] = []
results.extend(file_results) results_queues[media_type] = queue.Queue()
search_hndl = self._get_search_handler_by_type(media_type)
worker_threads[media_type] = threading.Thread(
target=self._search_worker(query=query, search_hndl=search_hndl,
results_queue=results_queues[media_type]))
worker_threads[media_type].start()
if 'torrent' in types: for media_type in types:
torrents = get_plugin('torrent') try:
torrent_results = torrents.search(query).output results[media_type].extend(
results.extend(torrent_results) results_queues[media_type].get(timeout=search_timeout))
except queue.Empty:
self.logger.warning('Search for "{}" media type {} timed out'.
format(query, media_type))
if 'youtube' in types: flattened_results = []
yt_results = self.youtube_search(query).output for media_type in self._supported_media_types:
results.extend(yt_results) if media_type in results:
flattened_results += results[media_type]
results = flattened_results
if results: if results:
if queue_results: if queue_results:
@ -290,6 +311,24 @@ class MediaPlugin(Plugin):
return results return results
def _search_worker(self, query, search_hndl, results_queue):
def thread():
results_queue.put(search_hndl.search(query))
return thread
def _get_search_handler_by_type(self, search_type):
if search_type == 'file':
from .search import LocalMediaSearcher
return LocalMediaSearcher(self.media_dirs)
if search_type == 'torrent':
from .search import TorrentMediaSearcher
return TorrentMediaSearcher()
if search_type == 'youtube':
from .search import YoutubeMediaSearcher
return YoutubeMediaSearcher()
self.logger.warning('Unsupported search type: {}'.format(search_type))
@classmethod @classmethod
def _is_video_file(cls, filename): def _is_video_file(cls, filename):
return filename.lower().split('.')[-1] in cls.video_extensions return filename.lower().split('.')[-1] in cls.video_extensions
@ -298,61 +337,6 @@ class MediaPlugin(Plugin):
def _is_audio_file(cls, filename): def _is_audio_file(cls, filename):
return filename.lower().split('.')[-1] in cls.audio_extensions return filename.lower().split('.')[-1] in cls.audio_extensions
@action
def file_search(self, query):
try:
from .local import LocalMediaSearcher
return LocalMediaSearcher(self.media_dirs).search(query)
except Exception as e:
self.logger.warning('Could not load the local file indexer: {}. '.
format(str(e)) + 'Falling back to directory scan')
results = []
query_tokens = [_.lower() for _ in re.split('\s+', query.strip())]
for media_dir in self.media_dirs:
self.logger.info('Scanning {} for "{}"'.format(media_dir, query))
for path, dirs, files in os.walk(media_dir):
for f in files:
if not self._is_video_file(f) and not self._is_audio_file(f):
continue
matches_query = True
for token in query_tokens:
if token not in f.lower():
matches_query = False
break
if not matches_query:
continue
results.append({
'url': 'file://' + path + os.sep + f,
'title': f,
})
return results
@action
def youtube_search(self, query):
"""
Performs a YouTube search either using the YouTube API (faster and
recommended, it requires the :mod:`platypush.plugins.google.youtube`
plugin to be configured) or parsing the HTML search results (fallback
slower method)
"""
self.logger.info('Searching YouTube for "{}"'.format(query))
try:
return self._youtube_search_api(query=query)
except Exception as e:
self.logger.warning('Unable to load the YouTube plugin, falling ' +
'back to HTML parse method: {}'.format(str(e)))
return self._youtube_search_html_parse(query=query)
@action @action
def start_streaming(self, media, download=False): def start_streaming(self, media, download=False):
""" """

View file

@ -0,0 +1,24 @@
import logging
class MediaSearcher:
"""
Base class for media searchers
"""
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(self.__class__.__name__)
def search(self, query, *args, **kwargs):
raise NotImplementedError('The search method should be implemented ' +
'by a derived class')
from .local import LocalMediaSearcher
from .youtube import YoutubeMediaSearcher
from .torrent import TorrentMediaSearcher
__all__ = ['LocalMediaSearcher', 'TorrentMediaSearcher', 'YoutubeMediaSearcher']
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,4 @@
import datetime import datetime
import logging
import os import os
import re import re
import time import time
@ -13,11 +12,12 @@ from sqlalchemy.sql.expression import func
from platypush.config import Config from platypush.config import Config
from platypush.plugins.media import MediaPlugin from platypush.plugins.media import MediaPlugin
from platypush.plugins.media.search import MediaSearcher
Base = declarative_base() Base = declarative_base()
Session = scoped_session(sessionmaker()) Session = scoped_session(sessionmaker())
class LocalMediaSearcher: class LocalMediaSearcher(MediaSearcher):
""" """
This class will search for media in the local configured directories. It This class will search for media in the local configured directories. It
will index the media files for a faster search, it will detect which will index the media files for a faster search, it will detect which
@ -32,7 +32,7 @@ class LocalMediaSearcher:
_filename_separators = '[.,_\-@()\[\]\{\}\s\'\"]+' _filename_separators = '[.,_\-@()\[\]\{\}\s\'\"]+'
def __init__(self, dirs, *args, **kwargs): def __init__(self, dirs, *args, **kwargs):
self.logger = logging.getLogger(self.__class__.__name__) super().__init__()
self.dirs = dirs self.dirs = dirs
db_dir = os.path.join(Config.get('workdir'), 'media') db_dir = os.path.join(Config.get('workdir'), 'media')
os.makedirs(db_dir, exist_ok=True) os.makedirs(db_dir, exist_ok=True)

View file

@ -0,0 +1,15 @@
from platypush.context import get_plugin
from platypush.plugins.media.search import MediaSearcher
class TorrentMediaSearcher(MediaSearcher):
def search(self, query):
self.logger.info('Searching torrents for "{}"'.format(query))
torrents = get_plugin('torrent')
if not torrents:
raise RuntimeError('Torrent plugin not available/configured')
return torrents.search(query).output
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,61 @@
import re
import urllib
from platypush.context import get_plugin
from platypush.plugins.media.search import MediaSearcher
class YoutubeMediaSearcher(MediaSearcher):
def search(self, query):
"""
Performs a YouTube search either using the YouTube API (faster and
recommended, it requires the :mod:`platypush.plugins.google.youtube`
plugin to be configured) or parsing the HTML search results (fallback
slower method)
"""
self.logger.info('Searching YouTube for "{}"'.format(query))
try:
return self._youtube_search_api(query=query)
except Exception as e:
self.logger.warning('Unable to load the YouTube plugin, falling ' +
'back to HTML parse method: {}'.format(str(e)))
return self._youtube_search_html_parse(query=query)
def _youtube_search_api(self, query):
return [
{
'url': 'https://www.youtube.com/watch?v=' + item['id']['videoId'],
'title': item.get('snippet', {}).get('title', '<No Title>'),
}
for item in get_plugin('google.youtube').search(query=query).output
if item.get('id', {}).get('kind') == 'youtube#video'
]
def _youtube_search_html_parse(self, query):
query = urllib.parse.quote(query)
url = "https://www.youtube.com/results?search_query=" + query
response = urllib.request.urlopen(url)
html = response.read().decode('utf-8')
results = []
while html:
m = re.search('(<a href="(/watch\?v=.+?)".+?yt-uix-tile-link.+?title="(.+?)".+?>)', html)
if m:
results.append({
'url': 'https://www.youtube.com' + m.group(2),
'title': m.group(3)
})
html = html.split(m.group(1))[1]
else:
html = ''
self.logger.info('{} YouTube video results for the search query "{}"'
.format(len(results), query))
return results
# vim:sw=4:ts=4:et: