diff --git a/platypush/plugins/media/search/youtube.py b/platypush/plugins/media/search/youtube.py
index 7fa8a6fbfb..37f019e6fd 100644
--- a/platypush/plugins/media/search/youtube.py
+++ b/platypush/plugins/media/search/youtube.py
@@ -2,13 +2,19 @@ import re
import urllib.parse
import urllib.request
+import requests
+
from platypush.context import get_plugin
-# noinspection PyProtectedMember
from platypush.plugins.media.search import MediaSearcher
+# pylint: disable=too-few-public-methods
class YoutubeMediaSearcher(MediaSearcher):
- def search(self, query, **kwargs):
+ """
+ Search YouTube videos by query.
+ """
+
+ def search(self, query: str, *_, **__):
"""
Performs a YouTube search either using the YouTube API (faster and
recommended, it requires the :mod:`platypush.plugins.google.youtube`
@@ -16,48 +22,59 @@ class YoutubeMediaSearcher(MediaSearcher):
slower method)
"""
- self.logger.info('Searching YouTube for "{}"'.format(query))
+ self.logger.info('Searching YouTube for "%s"', query)
try:
return self._youtube_search_api(query=query)
except Exception as e:
- self.logger.warning('Unable to load the YouTube plugin, falling ' +
- 'back to HTML parse method: {}'.format(str(e)))
+ self.logger.warning(
+ (
+ 'Unable to load the YouTube plugin, '
+ 'falling back to HTML parse method: %s'
+ ),
+ e,
+ )
- return self._youtube_search_html_parse(query=query)
+ return self._youtube_search_html_parse(query=query)
@staticmethod
def _youtube_search_api(query):
+ yt = get_plugin('google.youtube')
+ assert yt, 'YouTube plugin not configured'
return [
{
'url': 'https://www.youtube.com/watch?v=' + item['id']['videoId'],
**item.get('snippet', {}),
}
- for item in get_plugin('google.youtube').search(query=query).output
+ for item in yt.search(query=query).output
if item.get('id', {}).get('kind') == 'youtube#video'
]
def _youtube_search_html_parse(self, query):
query = urllib.parse.quote(query)
url = "https://www.youtube.com/results?search_query=" + query
- response = urllib.request.urlopen(url)
- html = response.read().decode('utf-8')
+ html = requests.get(url, timeout=10).content
results = []
while html:
- m = re.search('()', html)
+ m = re.search(
+ r'()',
+ html,
+ )
if m:
- results.append({
- 'url': 'https://www.youtube.com' + m.group(2),
- 'title': m.group(3)
- })
+ results.append(
+ {'url': 'https://www.youtube.com' + m.group(2), 'title': m.group(3)}
+ )
html = html.split(m.group(1))[1]
else:
html = ''
- self.logger.info('{} YouTube video results for the search query "{}"'
- .format(len(results), query))
+ self.logger.info(
+ '%d YouTube video results for the search query "%s"',
+ len(results),
+ query,
+ )
return results