[`media`] Added `media.get_info` action.

It combines the `-j` and `-g` options of ytdl* to get both the stream
URL and the track metadata.
This commit is contained in:
Fabio Manganiello 2023-11-06 02:24:50 +01:00
parent e0a9ccca24
commit 27da2becd7
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 18 additions and 5 deletions

View File

@ -1,6 +1,7 @@
import enum
import functools
import inspect
import json
import os
import queue
import re
@ -283,7 +284,7 @@ class MediaPlugin(Plugin, ABC):
"""
if self._is_youtube_resource(resource):
resource = self.get_youtube_video_url(resource)
resource = self._get_youtube_info(resource).get('url')
elif resource.startswith('magnet:?'):
self.logger.info(
'Downloading torrent %s to %s', resource, self.download_dir
@ -589,7 +590,7 @@ class MediaPlugin(Plugin, ABC):
assert response.ok, response.text or response.reason
return response.json()
def get_youtube_video_url(self, url, youtube_format: Optional[str] = None):
def _get_youtube_info(self, url, youtube_format: Optional[str] = None):
ytdl_cmd = [
self._ytdl,
*(
@ -597,16 +598,21 @@ class MediaPlugin(Plugin, ABC):
if youtube_format or self.youtube_format
else []
),
'-j',
'-g',
url,
]
self.logger.info('Executing command %s', ' '.join(ytdl_cmd))
with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as ytdl:
url = ytdl.communicate()[0].decode().strip()
output = ytdl.communicate()[0].decode().strip()
ytdl.wait()
return url
stream_url, info = output.split('\n')
return {
**json.loads(info),
'url': stream_url,
}
@staticmethod
def get_youtube_id(url: str) -> Optional[str]:
@ -632,7 +638,7 @@ class MediaPlugin(Plugin, ABC):
youtube_id = self.get_youtube_id(url)
if youtube_id:
url = f'https://www.youtube.com/watch?v={youtube_id}'
return self.get_youtube_video_url(url, youtube_format=youtube_format)
return self._get_youtube_info(url, youtube_format=youtube_format).get('url')
return None
@ -648,6 +654,13 @@ class MediaPlugin(Plugin, ABC):
return proc.stdout.read().decode("utf-8", "strict")[:-1]
@action
def get_info(self, resource: str):
if self._is_youtube_resource(resource):
return self.get_youtube_info(resource)
return {'url': resource}
@action
def get_media_file_duration(self, filename):
"""