From 9a565061c45b6bed5bbb9d40baf2135e700ba8c6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 18 Feb 2019 00:26:46 +0100 Subject: [PATCH] Refactoring of media.search in a more scalable way. Also faster search with workers in parallel by media type --- platypush/plugins/media/__init__.py | 120 ++++++++---------- platypush/plugins/media/search/__init__.py | 24 ++++ platypush/plugins/media/{ => search}/local.py | 6 +- platypush/plugins/media/search/torrent.py | 15 +++ platypush/plugins/media/search/youtube.py | 61 +++++++++ 5 files changed, 155 insertions(+), 71 deletions(-) create mode 100644 platypush/plugins/media/search/__init__.py rename platypush/plugins/media/{ => search}/local.py (98%) create mode 100644 platypush/plugins/media/search/torrent.py create mode 100644 platypush/plugins/media/search/youtube.py diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index b869817b..241ca138 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -1,5 +1,6 @@ import enum import os +import queue import re import subprocess import threading @@ -63,6 +64,9 @@ class MediaPlugin(Plugin): _supported_media_plugins = {'media.mplayer', 'media.omxplayer', 'media.chromecast'} + _supported_media_types = ['file', 'torrent', 'youtube'] + _default_search_timeout = 60 # 60 seconds + def __init__(self, media_dirs=[], download_dir=None, env=None, *args, **kwargs): """ @@ -246,7 +250,8 @@ class MediaPlugin(Plugin): raise self._NOT_IMPLEMENTED_ERR @action - def search(self, query, types=None, queue_results=False, autoplay=False): + def search(self, query, types=None, queue_results=False, autoplay=False, + search_timeout=_default_search_timeout): """ Perform a video search. @@ -261,24 +266,40 @@ class MediaPlugin(Plugin): :param autoplay: Play the first result of the search (default: False) :type autoplay: bool + + :param search_timeout: Search timeout (default: 60 seconds) + :type search_timeout: float """ - results = [] + results = {} + results_queues = {} + worker_threads = {} + if types is None: - types = { 'youtube', 'file', 'torrent' } + types = self._supported_media_types - if 'file' in types: - file_results = self.file_search(query).output - results.extend(file_results) + for media_type in types: + results[media_type] = [] + results_queues[media_type] = queue.Queue() + search_hndl = self._get_search_handler_by_type(media_type) + worker_threads[media_type] = threading.Thread( + target=self._search_worker(query=query, search_hndl=search_hndl, + results_queue=results_queues[media_type])) + worker_threads[media_type].start() - if 'torrent' in types: - torrents = get_plugin('torrent') - torrent_results = torrents.search(query).output - results.extend(torrent_results) + for media_type in types: + try: + results[media_type].extend( + results_queues[media_type].get(timeout=search_timeout)) + except queue.Empty: + self.logger.warning('Search for "{}" media type {} timed out'. + format(query, media_type)) - if 'youtube' in types: - yt_results = self.youtube_search(query).output - results.extend(yt_results) + flattened_results = [] + for media_type in self._supported_media_types: + if media_type in results: + flattened_results += results[media_type] + results = flattened_results if results: if queue_results: @@ -290,6 +311,24 @@ class MediaPlugin(Plugin): return results + def _search_worker(self, query, search_hndl, results_queue): + def thread(): + results_queue.put(search_hndl.search(query)) + return thread + + def _get_search_handler_by_type(self, search_type): + if search_type == 'file': + from .search import LocalMediaSearcher + return LocalMediaSearcher(self.media_dirs) + if search_type == 'torrent': + from .search import TorrentMediaSearcher + return TorrentMediaSearcher() + if search_type == 'youtube': + from .search import YoutubeMediaSearcher + return YoutubeMediaSearcher() + + self.logger.warning('Unsupported search type: {}'.format(search_type)) + @classmethod def _is_video_file(cls, filename): return filename.lower().split('.')[-1] in cls.video_extensions @@ -298,61 +337,6 @@ class MediaPlugin(Plugin): def _is_audio_file(cls, filename): return filename.lower().split('.')[-1] in cls.audio_extensions - @action - def file_search(self, query): - try: - from .local import LocalMediaSearcher - return LocalMediaSearcher(self.media_dirs).search(query) - except Exception as e: - self.logger.warning('Could not load the local file indexer: {}. '. - format(str(e)) + 'Falling back to directory scan') - - results = [] - query_tokens = [_.lower() for _ in re.split('\s+', query.strip())] - - for media_dir in self.media_dirs: - self.logger.info('Scanning {} for "{}"'.format(media_dir, query)) - for path, dirs, files in os.walk(media_dir): - for f in files: - if not self._is_video_file(f) and not self._is_audio_file(f): - continue - - matches_query = True - for token in query_tokens: - if token not in f.lower(): - matches_query = False - break - - if not matches_query: - continue - - results.append({ - 'url': 'file://' + path + os.sep + f, - 'title': f, - }) - - return results - - @action - def youtube_search(self, query): - """ - Performs a YouTube search either using the YouTube API (faster and - recommended, it requires the :mod:`platypush.plugins.google.youtube` - plugin to be configured) or parsing the HTML search results (fallback - slower method) - """ - - self.logger.info('Searching YouTube for "{}"'.format(query)) - - try: - return self._youtube_search_api(query=query) - except Exception as e: - self.logger.warning('Unable to load the YouTube plugin, falling ' + - 'back to HTML parse method: {}'.format(str(e))) - - return self._youtube_search_html_parse(query=query) - - @action def start_streaming(self, media, download=False): """ diff --git a/platypush/plugins/media/search/__init__.py b/platypush/plugins/media/search/__init__.py new file mode 100644 index 00000000..e72695c2 --- /dev/null +++ b/platypush/plugins/media/search/__init__.py @@ -0,0 +1,24 @@ +import logging + +class MediaSearcher: + """ + Base class for media searchers + """ + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(self.__class__.__name__) + + + def search(self, query, *args, **kwargs): + raise NotImplementedError('The search method should be implemented ' + + 'by a derived class') + + +from .local import LocalMediaSearcher +from .youtube import YoutubeMediaSearcher +from .torrent import TorrentMediaSearcher + +__all__ = ['LocalMediaSearcher', 'TorrentMediaSearcher', 'YoutubeMediaSearcher'] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/local.py b/platypush/plugins/media/search/local.py similarity index 98% rename from platypush/plugins/media/local.py rename to platypush/plugins/media/search/local.py index a74ab3c0..3691c39d 100644 --- a/platypush/plugins/media/local.py +++ b/platypush/plugins/media/search/local.py @@ -1,5 +1,4 @@ import datetime -import logging import os import re import time @@ -13,11 +12,12 @@ from sqlalchemy.sql.expression import func from platypush.config import Config from platypush.plugins.media import MediaPlugin +from platypush.plugins.media.search import MediaSearcher Base = declarative_base() Session = scoped_session(sessionmaker()) -class LocalMediaSearcher: +class LocalMediaSearcher(MediaSearcher): """ This class will search for media in the local configured directories. It will index the media files for a faster search, it will detect which @@ -32,7 +32,7 @@ class LocalMediaSearcher: _filename_separators = '[.,_\-@()\[\]\{\}\s\'\"]+' def __init__(self, dirs, *args, **kwargs): - self.logger = logging.getLogger(self.__class__.__name__) + super().__init__() self.dirs = dirs db_dir = os.path.join(Config.get('workdir'), 'media') os.makedirs(db_dir, exist_ok=True) diff --git a/platypush/plugins/media/search/torrent.py b/platypush/plugins/media/search/torrent.py new file mode 100644 index 00000000..cfc39361 --- /dev/null +++ b/platypush/plugins/media/search/torrent.py @@ -0,0 +1,15 @@ +from platypush.context import get_plugin +from platypush.plugins.media.search import MediaSearcher + +class TorrentMediaSearcher(MediaSearcher): + def search(self, query): + self.logger.info('Searching torrents for "{}"'.format(query)) + + torrents = get_plugin('torrent') + if not torrents: + raise RuntimeError('Torrent plugin not available/configured') + return torrents.search(query).output + + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/youtube.py b/platypush/plugins/media/search/youtube.py new file mode 100644 index 00000000..2f406839 --- /dev/null +++ b/platypush/plugins/media/search/youtube.py @@ -0,0 +1,61 @@ +import re +import urllib + +from platypush.context import get_plugin +from platypush.plugins.media.search import MediaSearcher + +class YoutubeMediaSearcher(MediaSearcher): + def search(self, query): + """ + Performs a YouTube search either using the YouTube API (faster and + recommended, it requires the :mod:`platypush.plugins.google.youtube` + plugin to be configured) or parsing the HTML search results (fallback + slower method) + """ + + self.logger.info('Searching YouTube for "{}"'.format(query)) + + try: + return self._youtube_search_api(query=query) + except Exception as e: + self.logger.warning('Unable to load the YouTube plugin, falling ' + + 'back to HTML parse method: {}'.format(str(e))) + + return self._youtube_search_html_parse(query=query) + + def _youtube_search_api(self, query): + return [ + { + 'url': 'https://www.youtube.com/watch?v=' + item['id']['videoId'], + 'title': item.get('snippet', {}).get('title', ''), + } + for item in get_plugin('google.youtube').search(query=query).output + if item.get('id', {}).get('kind') == 'youtube#video' + ] + + def _youtube_search_html_parse(self, query): + query = urllib.parse.quote(query) + url = "https://www.youtube.com/results?search_query=" + query + response = urllib.request.urlopen(url) + html = response.read().decode('utf-8') + results = [] + + while html: + m = re.search('()', html) + if m: + results.append({ + 'url': 'https://www.youtube.com' + m.group(2), + 'title': m.group(3) + }) + + html = html.split(m.group(1))[1] + else: + html = '' + + self.logger.info('{} YouTube video results for the search query "{}"' + .format(len(results), query)) + + return results + + +# vim:sw=4:ts=4:et: