[`youtube`] Black/LINT pass.

This commit is contained in:
Fabio Manganiello 2023-11-03 21:27:08 +01:00
parent f6cb1fa4a7
commit ba5d505c9b
1 changed files with 33 additions and 16 deletions

View File

@ -2,13 +2,19 @@ import re
import urllib.parse
import urllib.request
import requests
from platypush.context import get_plugin
# noinspection PyProtectedMember
from platypush.plugins.media.search import MediaSearcher
# pylint: disable=too-few-public-methods
class YoutubeMediaSearcher(MediaSearcher):
def search(self, query, **kwargs):
"""
Search YouTube videos by query.
"""
def search(self, query: str, *_, **__):
"""
Performs a YouTube search either using the YouTube API (faster and
recommended, it requires the :mod:`platypush.plugins.google.youtube`
@ -16,48 +22,59 @@ class YoutubeMediaSearcher(MediaSearcher):
slower method)
"""
self.logger.info('Searching YouTube for "{}"'.format(query))
self.logger.info('Searching YouTube for "%s"', query)
try:
return self._youtube_search_api(query=query)
except Exception as e:
self.logger.warning('Unable to load the YouTube plugin, falling ' +
'back to HTML parse method: {}'.format(str(e)))
self.logger.warning(
(
'Unable to load the YouTube plugin, '
'falling back to HTML parse method: %s'
),
e,
)
return self._youtube_search_html_parse(query=query)
return self._youtube_search_html_parse(query=query)
@staticmethod
def _youtube_search_api(query):
yt = get_plugin('google.youtube')
assert yt, 'YouTube plugin not configured'
return [
{
'url': 'https://www.youtube.com/watch?v=' + item['id']['videoId'],
**item.get('snippet', {}),
}
for item in get_plugin('google.youtube').search(query=query).output
for item in yt.search(query=query).output
if item.get('id', {}).get('kind') == 'youtube#video'
]
def _youtube_search_html_parse(self, query):
query = urllib.parse.quote(query)
url = "https://www.youtube.com/results?search_query=" + query
response = urllib.request.urlopen(url)
html = response.read().decode('utf-8')
html = requests.get(url, timeout=10).content
results = []
while html:
m = re.search('(<a href="(/watch\?v=.+?)".+?yt-uix-tile-link.+?title="(.+?)".+?>)', html)
m = re.search(
r'(<a href="(/watch\?v=.+?)".+?yt-uix-tile-link.+?title="(.+?)".+?>)',
html,
)
if m:
results.append({
'url': 'https://www.youtube.com' + m.group(2),
'title': m.group(3)
})
results.append(
{'url': 'https://www.youtube.com' + m.group(2), 'title': m.group(3)}
)
html = html.split(m.group(1))[1]
else:
html = ''
self.logger.info('{} YouTube video results for the search query "{}"'
.format(len(results), query))
self.logger.info(
'%d YouTube video results for the search query "%s"',
len(results),
query,
)
return results