[torrent] Refactored torrent search.

Allow for more torrent search providers other than PopcornTime (in
preparation for torrent-csv).
This commit is contained in:
Fabio Manganiello 2024-06-09 00:02:04 +02:00
parent 36f49952c4
commit 8fc3201b8c
9 changed files with 768 additions and 213 deletions

View file

@ -10,12 +10,12 @@
- [[#405](https://git.platypush.tech/platypush/platypush/issues/405)] Fixed - [[#405](https://git.platypush.tech/platypush/platypush/issues/405)] Fixed
timezone/timestamp rendering issues for `calendar.ical` events. timezone/timestamp rendering issues for `calendar.ical` events.
- [[#403]((https://git.platypush.tech/platypush/platypush/issues/403))] - [[#403]((https://git.platypush.tech/platypush/platypush/issues/403)]
Included inherited actions in plugins docs. Included inherited actions in plugins docs.
## [1.0.7] - 2024-06-02 ## [1.0.7] - 2024-06-02
- [[#384]((https://git.platypush.tech/platypush/platypush/issues/384))] Added - [[#384]((https://git.platypush.tech/platypush/platypush/issues/384)] Added
`assistant.openai` and `tts.openai` plugins. `assistant.openai` and `tts.openai` plugins.
## [1.0.6] - 2024-06-01 ## [1.0.6] - 2024-06-01

View file

@ -36,11 +36,6 @@
<div class="right side" v-text="item.num_seasons" /> <div class="right side" v-text="item.num_seasons" />
</div> </div>
<div class="row" v-if="item?.synopsis">
<div class="left side">Synopsis</div>
<div class="right side" v-text="item.synopsis" />
</div>
<div class="row" v-if="item?.description"> <div class="row" v-if="item?.description">
<div class="left side">Description</div> <div class="left side">Description</div>
<div class="right side" v-text="item.description" /> <div class="right side" v-text="item.description" />
@ -80,7 +75,7 @@
<div class="row" v-if="item?.rating"> <div class="row" v-if="item?.rating">
<div class="left side">Rating</div> <div class="left side">Rating</div>
<div class="right side">{{ item.rating.percentage }}%</div> <div class="right side">{{ item.rating }}%</div>
</div> </div>
<div class="row" v-if="item?.critic_rating"> <div class="row" v-if="item?.critic_rating">
@ -93,9 +88,9 @@
<div class="right side">{{ item.community_rating }}%</div> <div class="right side">{{ item.community_rating }}%</div>
</div> </div>
<div class="row" v-if="item?.rating"> <div class="row" v-if="item?.votes">
<div class="left side">Votes</div> <div class="left side">Votes</div>
<div class="right side" v-text="item.rating.votes" /> <div class="right side" v-text="item.votes" />
</div> </div>
<div class="row" v-if="item?.genres"> <div class="row" v-if="item?.genres">

View file

@ -1,11 +1,10 @@
import inspect
import os import os
import pathlib import pathlib
import queue
import random import random
import threading import threading
import time import time
from urllib.parse import quote_plus from typing import Dict, Iterable, Optional, Union
from typing import Iterable, List, Optional, Union
import requests import requests
@ -24,6 +23,9 @@ from platypush.message.event.torrent import (
) )
from platypush.utils import get_default_downloads_dir from platypush.utils import get_default_downloads_dir
from . import _search as search_module
from ._search import TorrentSearchProvider
class TorrentPlugin(Plugin): class TorrentPlugin(Plugin):
""" """
@ -38,75 +40,164 @@ class TorrentPlugin(Plugin):
default_torrent_ports = (6881, 6891) default_torrent_ports = (6881, 6891)
torrent_state = {} torrent_state = {}
transfers = {} transfers = {}
default_popcorn_base_url = 'https://shows.cf'
def __init__( def __init__(
self, self,
download_dir: Optional[str] = None, download_dir: Optional[str] = None,
torrent_ports: Iterable[int] = default_torrent_ports, torrent_ports: Iterable[int] = default_torrent_ports,
popcorn_base_url: str = default_popcorn_base_url, search_providers: Optional[
Union[Dict[str, dict], Iterable[TorrentSearchProvider]]
] = None,
**kwargs, **kwargs,
): ):
""" """
:param download_dir: Directory where the videos/torrents will be :param download_dir: Directory where the videos/torrents will be
downloaded (default: ``~/Downloads``). downloaded (default: ``~/Downloads``).
:param torrent_ports: Torrent ports to listen on (default: 6881 and 6891) :param torrent_ports: Torrent ports to listen on (default: 6881 and 6891)
:param popcorn_base_url: Custom base URL to use for the PopcornTime API. :param search_providers: List of search providers to use. Each provider
""" has its own supported configuration and needs to be an instance of
:class:`TorrentSearchProvider`. Currently supported providers:
* :class:`platypush.plugins.torrent._search.PopcornTimeSearchProvider`
* :class:`platypush.plugins.torrent._search.TorrentCsvSearchProvider`
Configuration example:
.. code-block:: yaml
torrent:
# ...
search_providers:
torrent_csv:
# Default: True
# enabled: true
# Base URL of the torrent-csv API.
# See https://git.torrents-csv.com/heretic/torrents-csv-server
# for how to run your own torrent-csv API server.
api_url: https://torrents-csv.com/service
# Alternatively, you can also use a local checkout of the
# torrent.csv file. Clone
# https://git.torrents-csv.com/heretic/torrents-csv-data
# and provide the path to the torrent.csv file here.
# csv_file: /path/to/torrent.csv
popcorn_time:
# Default: False
# enabled: false
# Required: PopcornTime API base URL.
# See https://github.com/popcorn-time-ru/popcorn-ru for
# how to run your own PopcornTime API server.
api_url: https://popcorntime.app
"""
super().__init__(**kwargs) super().__init__(**kwargs)
self.torrent_ports = torrent_ports self.torrent_ports = torrent_ports
self.download_dir = os.path.abspath( self.download_dir = os.path.abspath(
os.path.expanduser(download_dir or get_default_downloads_dir()) os.path.expanduser(download_dir or get_default_downloads_dir())
) )
self._search_providers = self._load_search_providers(search_providers)
self._sessions = {} self._sessions = {}
self._lt_session = None self._lt_session = None
self.popcorn_base_url = popcorn_base_url
self.torrent_base_urls = {
'movies': f'{popcorn_base_url}/movie/{{}}',
'tv': f'{popcorn_base_url}/show/{{}}',
}
pathlib.Path(self.download_dir).mkdir(parents=True, exist_ok=True) pathlib.Path(self.download_dir).mkdir(parents=True, exist_ok=True)
def _load_search_providers(
self,
search_providers: Optional[
Union[Dict[str, dict], Iterable[TorrentSearchProvider]]
],
) -> Iterable[TorrentSearchProvider]:
if not search_providers:
return []
parsed_providers = []
if isinstance(search_providers, dict):
providers_dict = {}
provider_classes = {
cls.provider_name(): cls
for _, cls in inspect.getmembers(search_module, inspect.isclass)
if issubclass(cls, TorrentSearchProvider)
and cls != TorrentSearchProvider
}
# Configure the search providers explicitly passed in the configuration
for provider_name, provider_config in search_providers.items():
if provider_name not in provider_classes:
self.logger.warning(
'Unsupported search provider %s. Supported providers: %s',
provider_name,
list(provider_classes.keys()),
)
continue
provider_class = provider_classes[provider_name]
providers_dict[provider_name] = provider_class(
**{'enabled': True, **provider_config}
)
# Enable the search providers that have `default_enabled` set to True
# and that are not explicitly disabled
for provider_name, provider in provider_classes.items():
if provider.default_enabled() and provider_name not in search_providers:
providers_dict[provider_name] = provider()
parsed_providers = list(providers_dict.values())
else:
parsed_providers = search_providers
assert all(
isinstance(provider, TorrentSearchProvider) for provider in parsed_providers
), 'All search providers must be instances of TorrentSearchProvider'
return parsed_providers
@action @action
def search( def search(
self, self,
query: str, query: str,
providers: Optional[Union[str, Iterable[str]]] = None,
*args, *args,
category: Optional[Union[str, Iterable[str]]] = None, **filters,
language: Optional[str] = None,
**kwargs,
): ):
""" """
Perform a search of video torrents. Perform a search of video torrents.
:param query: Query string, video name or partial name :param query: Query string, video name or partial name
:param category: Category to search. Supported types: "movies", "tv". :param providers: Override the default search providers by specifying
Default: None (search all categories) the provider names to use for this search.
:param language: Language code for the results - example: "en" (default: None, no filter) :param filters: Additional filters to apply to the search, depending on
what the configured search providers support. For example,
``category`` and ``language`` are supported by the PopcornTime.
:return: .. schema:: torrent.TorrentResultSchema(many=True)
""" """
results = [] results = []
if isinstance(category, str):
category = [category]
def worker(cat): def worker(provider: TorrentSearchProvider):
if cat not in self.categories: self.logger.debug(
raise RuntimeError( 'Searching torrents on provider %s, query: %s',
f'Unsupported category {cat}. Supported categories: ' provider.provider_name(),
f'{list(self.categories.keys())}' query,
)
self.logger.info('Searching %s torrents for "%s"', cat, query)
results.extend(
self.categories[cat](self, query, *args, language=language, **kwargs)
) )
results.extend(provider.search(query, *args, **filters))
if providers:
providers = [providers] if isinstance(providers, str) else providers
search_providers = [
provider
for provider in self._search_providers
if provider.provider_name() in providers
]
else:
search_providers = self._search_providers
if not search_providers:
self.logger.warning('No search providers enabled')
return []
workers = [ workers = [
threading.Thread(target=worker, kwargs={'cat': category}) threading.Thread(target=worker, kwargs={'provider': provider})
for category in (category or self.categories.keys()) for provider in search_providers
] ]
for wrk in workers: for wrk in workers:
@ -114,170 +205,7 @@ class TorrentPlugin(Plugin):
for wrk in workers: for wrk in workers:
wrk.join() wrk.join()
return results return [result.to_dict() for result in results]
def _imdb_query(self, query: str, category: str):
if not query:
return []
if category == 'movies':
imdb_category = 'movie'
elif category == 'tv':
imdb_category = 'tvSeries'
else:
raise RuntimeError(f'Unsupported category: {category}')
imdb_url = f'https://v3.sg.media-imdb.com/suggestion/x/{quote_plus(query)}.json?includeVideos=1'
response = requests.get(imdb_url, timeout=self._http_timeout)
response.raise_for_status()
response = response.json()
assert not response.get('errorMessage'), response['errorMessage']
return [
item for item in response.get('d', []) if item.get('qid') == imdb_category
]
def _torrent_search_worker(self, imdb_id: str, category: str, q: queue.Queue):
base_url = self.torrent_base_urls.get(category)
assert base_url, f'No such category: {category}'
try:
results = requests.get(
base_url.format(imdb_id), timeout=self._http_timeout
).json()
q.put(results)
except Exception as e:
q.put(e)
def _search_torrents(self, query, category):
imdb_results = self._imdb_query(query, category)
result_queues = [queue.Queue()] * len(imdb_results)
workers = [
threading.Thread(
target=self._torrent_search_worker,
kwargs={
'imdb_id': imdb_results[i]['id'],
'category': category,
'q': result_queues[i],
},
)
for i in range(len(imdb_results))
]
results = []
errors = []
for worker in workers:
worker.start()
for q in result_queues:
res_ = q.get()
if isinstance(res_, Exception):
errors.append(res_)
else:
results.append(res_)
for worker in workers:
worker.join()
if errors:
self.logger.warning('Torrent search errors: %s', [str(e) for e in errors])
return results
@staticmethod
def _results_to_movies_response(
results: List[dict], language: Optional[str] = None
):
return sorted(
[
{
'imdb_id': result.get('imdb_id'),
'type': 'movies',
'file': item.get('file'),
'title': (
result.get('title', '[No Title]')
+ f' [movies][{lang}][{quality}]'
),
'duration': int(result.get('runtime') or 0) * 60,
'year': int(result.get('year') or 0),
'synopsis': result.get('synopsis'),
'trailer': result.get('trailer'),
'genres': result.get('genres', []),
'image': result.get('images', {}).get('poster'),
'rating': result.get('rating', {}),
'language': lang,
'quality': quality,
'size': item.get('size'),
'provider': item.get('provider'),
'seeds': item.get('seed'),
'peers': item.get('peer'),
'url': item.get('url'),
}
for result in results
for (lang, items) in (result.get('torrents', {}) or {}).items()
if not language or language == lang
for (quality, item) in items.items()
if quality != '0'
],
key=lambda item: item.get('seeds', 0),
reverse=True,
)
@staticmethod
def _results_to_tv_response(results: List[dict]):
return sorted(
[
{
'imdb_id': result.get('imdb_id'),
'tvdb_id': result.get('tvdb_id'),
'type': 'tv',
'file': item.get('file'),
'series': result.get('title'),
'title': (
result.get('title', '[No Title]')
+ f'[S{episode.get("season", 0):02d}E{episode.get("episode", 0):02d}] '
+ f'{episode.get("title", "[No Title]")} [tv][{quality}]'
),
'duration': int(result.get('runtime') or 0) * 60,
'year': int(result.get('year') or 0),
'synopsis': result.get('synopsis'),
'overview': episode.get('overview'),
'season': episode.get('season'),
'episode': episode.get('episode'),
'num_seasons': result.get('num_seasons'),
'country': result.get('country'),
'network': result.get('network'),
'status': result.get('status'),
'genres': result.get('genres', []),
'image': result.get('images', {}).get('fanart'),
'rating': result.get('rating', {}),
'quality': quality,
'provider': item.get('provider'),
'seeds': item.get('seeds'),
'peers': item.get('peers'),
'url': item.get('url'),
}
for result in results
for episode in result.get('episodes', [])
for quality, item in (episode.get('torrents', {}) or {}).items()
if quality != '0'
],
key=lambda item: (
'.'.join(
[
item.get('series', ''),
item.get('quality', ''),
str(item.get('season', 0)).zfill(2),
str(item.get('episode', 0)).zfill(2),
]
)
),
)
def search_movies(self, query, language=None):
return self._results_to_movies_response(
self._search_torrents(query, 'movies'), language=language
)
def search_tv(self, query, **_):
return self._results_to_tv_response(self._search_torrents(query, 'tv'))
def _get_torrent_info(self, torrent, download_dir): def _get_torrent_info(self, torrent, download_dir):
import libtorrent as lt import libtorrent as lt
@ -619,10 +547,5 @@ class TorrentPlugin(Plugin):
name += hex(random.randint(0, 15))[2:].upper() name += hex(random.randint(0, 15))[2:].upper()
return name + '.torrent' return name + '.torrent'
categories = {
'movies': search_movies,
'tv': search_tv,
}
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,11 @@
from ._base import TorrentSearchProvider
from ._popcorntime import PopcornTimeSearchProvider
__all__ = [
'TorrentSearchProvider',
'PopcornTimeSearchProvider',
]
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,64 @@
from abc import ABC, abstractmethod
from logging import getLogger
from typing import Iterable
from ._model import TorrentSearchResult
class TorrentSearchProvider(ABC):
"""
Base class for torrent search providers.
"""
def __init__(self, **kwargs):
if 'enabled' in kwargs:
self.enabled = bool(kwargs['enabled'])
elif 'disabled' in kwargs:
self.enabled = not bool(kwargs['disabled'])
else:
self.enabled = self.default_enabled()
self.logger = getLogger(self.__class__.__name__)
@abstractmethod
def _search(self, query: str, *args, **kwargs) -> Iterable[TorrentSearchResult]:
"""
Inner search method. This method should be implemented by subclasses.
:param query: Query string, torrent name or partial name.
"""
return []
@classmethod
@abstractmethod
def provider_name(cls) -> str:
"""
:return: Name of the provider, which can be used to identify it within
the :class:`platypush.plugins.torrent.TorrentPlugin`.
"""
def search(self, query: str, *args, **kwargs) -> Iterable[TorrentSearchResult]:
"""
Perform a search of torrents.
:param query: Query string, torrent name or partial name.
"""
if not self.enabled:
self.logger.debug(
'Provider %s is disabled, skipping search', self.__class__.__name__
)
return []
self.logger.debug('Searching for %r', query)
return self._search(query, *args, **kwargs)
@classmethod
def default_enabled(cls) -> bool:
"""
:return: True if the provider is enabled by default, False otherwise.
Default: False.
"""
return False
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,49 @@
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime
from platypush.schemas.torrent import TorrentResultSchema
@dataclass
class TorrentSearchResult:
"""
Data class for results returned by
:meth:`platypush.plugins.torrent.TorrentPlugin.search`.
"""
title: str
file: Optional[str] = None
url: Optional[str] = None
provider: Optional[str] = None
type: Optional[str] = None
size: int = 0
duration: float = 0
language: Optional[str] = None
category: Optional[str] = None
seeds: int = 0
peers: int = 0
image: Optional[str] = None
description: Optional[str] = None
imdb_id: Optional[str] = None
tvdb_id: Optional[str] = None
year: Optional[int] = None
created_at: Optional[datetime] = None
quality: Optional[str] = None
overview: Optional[str] = None
trailer: Optional[str] = None
genres: List[str] = field(default_factory=list)
rating: Optional[float] = None
critic_rating: Optional[float] = None
community_rating: Optional[float] = None
votes: Optional[int] = None
series: Optional[str] = None
season: Optional[int] = None
episode: Optional[int] = None
num_seasons: Optional[int] = None
country: Optional[str] = None
network: Optional[str] = None
series_status: Optional[str] = None
def to_dict(self) -> dict:
return dict(TorrentResultSchema().dump(self))

View file

@ -0,0 +1,259 @@
import queue
import threading
from urllib.parse import quote_plus
from typing import Iterable, List, Optional, Union
import requests
from ._base import TorrentSearchProvider
from ._model import TorrentSearchResult
class PopcornTimeSearchProvider(TorrentSearchProvider):
"""
Torrent search provider that uses the PopcornTime API to search for movies
and TV shows.
"""
_http_timeout = 20
def __init__(
self,
api_url: str,
**kwargs,
):
"""
:param api_url: PopcornTime API base URL.
"""
super().__init__(**kwargs)
self.api_url = api_url
self.torrent_base_urls = {
'movies': f'{api_url}/movie/{{}}',
'tv': f'{api_url}/show/{{}}',
}
@classmethod
def provider_name(cls) -> str:
return 'popcorntime'
def _search(
self,
query: str,
*args,
category: Optional[Union[str, Iterable[str]]] = None,
language: Optional[str] = None,
**kwargs,
) -> List[TorrentSearchResult]:
"""
Perform a search of video torrents using the PopcornTime API.
:param query: Query string, video name or partial name
:param category: Category to search. Supported types: `movies`, `tv`.
Default: None (search all categories)
:param language: Language code for the results - example: "en" (default: None, no filter)
"""
results = []
if isinstance(category, str):
category = [category]
def worker(cat):
if cat not in self.categories:
raise RuntimeError(
f'Unsupported category {cat}. Supported categories: '
f'{list(self.categories.keys())}'
)
self.logger.info('Searching %s torrents for "%s"', cat, query)
results.extend(
self.categories[cat](self, query, *args, language=language, **kwargs)
)
workers = [
threading.Thread(target=worker, kwargs={'cat': category})
for category in (category or self.categories.keys())
]
for wrk in workers:
wrk.start()
for wrk in workers:
wrk.join()
return results
def _imdb_query(self, query: str, category: str):
if not query:
return []
if category == 'movies':
imdb_category = 'movie'
elif category == 'tv':
imdb_category = 'tvSeries'
else:
raise RuntimeError(f'Unsupported category: {category}')
imdb_url = f'https://v3.sg.media-imdb.com/suggestion/x/{quote_plus(query)}.json?includeVideos=1'
response = requests.get(imdb_url, timeout=self._http_timeout)
response.raise_for_status()
response = response.json()
assert not response.get('errorMessage'), response['errorMessage']
return [
item for item in response.get('d', []) if item.get('qid') == imdb_category
]
def _torrent_search_worker(self, imdb_id: str, category: str, q: queue.Queue):
base_url = self.torrent_base_urls.get(category)
assert base_url, f'No such category: {category}'
try:
self.logger.debug('Searching torrents for %s', imdb_id)
response = requests.get(
base_url.format(imdb_id), timeout=self._http_timeout
)
self.logger.debug('Got response for %s: %s', imdb_id, response.text)
result = response.json()
q.put(
result if result.get('code') != 404 and result.get('imdb_id') else None
)
except Exception as e:
q.put(e)
def _search_torrents(self, query, category):
imdb_results = self._imdb_query(query, category)
result_queues = [queue.Queue()] * len(imdb_results)
workers = [
threading.Thread(
target=self._torrent_search_worker,
kwargs={
'imdb_id': imdb_results[i]['id'],
'category': category,
'q': result_queues[i],
},
)
for i in range(len(imdb_results))
]
results = []
errors = []
for worker in workers:
worker.start()
for q in result_queues:
res_ = q.get()
if isinstance(res_, Exception):
errors.append(res_)
else:
results.append(res_)
for worker in workers:
worker.join()
if errors:
self.logger.warning('Torrent search errors: %s', [str(e) for e in errors])
return results
@classmethod
def _results_to_movies_response(
cls, results: List[dict], language: Optional[str] = None
):
return sorted(
[
TorrentSearchResult(
provider=cls.provider_name(),
imdb_id=result.get('imdb_id'),
type='movies',
title=result.get('title', '[No Title]')
+ f' [movies][{lang}][{quality}]',
file=item.get('file'),
duration=int(result.get('runtime') or 0) * 60,
year=int(result.get('year') or 0),
description=result.get('synopsis'),
trailer=result.get('trailer'),
genres=result.get('genres', []),
image=result.get('images', {}).get('poster'),
rating=result.get('rating', {}).get('percentage'),
votes=result.get('rating', {}).get('votes'),
language=lang,
quality=quality,
size=item.get('size'),
seeds=item.get('seed'),
peers=item.get('peer'),
url=item.get('url'),
)
for result in results
for (lang, items) in ((result or {}).get('torrents', {}) or {}).items()
if not language or language == lang
for (quality, item) in items.items()
if result
],
key=lambda item: item.seeds,
reverse=True,
)
@classmethod
def _results_to_tv_response(cls, results: List[dict]):
return sorted(
[
TorrentSearchResult(
provider=cls.provider_name(),
imdb_id=result.get('imdb_id'),
tvdb_id=result.get('tvdb_id'),
type='tv',
file=item.get('file'),
series=result.get('title'),
title=(
result.get('title', '[No Title]')
+ f'[S{episode.get("season", 0):02d}E{episode.get("episode", 0):02d}] '
+ f'{episode.get("title", "[No Title]")} [tv][{quality}]'
),
duration=int(result.get('runtime') or 0) * 60,
year=int(result.get('year') or 0),
description=result.get('synopsis'),
overview=episode.get('overview'),
season=episode.get('season'),
episode=episode.get('episode'),
num_seasons=result.get('num_seasons'),
country=result.get('country'),
network=result.get('network'),
series_status=result.get('status'),
genres=result.get('genres', []),
image=result.get('images', {}).get('fanart'),
rating=result.get('rating', {}).get('percentage'),
votes=result.get('rating', {}).get('votes'),
quality=quality,
seeds=item.get('seeds'),
peers=item.get('peers'),
url=item.get('url'),
)
for result in results
for episode in (result or {}).get('episodes', [])
for quality, item in (episode.get('torrents', {}) or {}).items()
if quality != '0'
],
key=lambda item: ( # type: ignore
'.'.join(
[ # type: ignore
(item.series or ''),
(item.quality or ''),
str(item.season or 0).zfill(2),
str(item.episode or 0).zfill(2),
]
)
),
)
def search_movies(self, query, language=None):
return self._results_to_movies_response(
self._search_torrents(query, 'movies'), language=language
)
def search_tv(self, query, **_):
return self._results_to_tv_response(self._search_torrents(query, 'tv'))
categories = {
'movies': search_movies,
'tv': search_tv,
}
# vim:sw=4:ts=4:et:

View file

@ -14,7 +14,9 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
if obj.get(attr) is not None: if getattr(obj, attr, None) is not None:
return self._strip(getattr(obj, attr))
if getattr(obj, 'get', lambda _: None)(attr) is not None:
return self._strip(obj.get(attr)) return self._strip(obj.get(attr))
@staticmethod @staticmethod

View file

@ -0,0 +1,252 @@
from marshmallow import fields, EXCLUDE, INCLUDE
from marshmallow.schema import Schema
from platypush.schemas import DateTime, StrippedString
class TorrentResultSchema(Schema):
"""
Schema for results returned by
:meth:`platypush.plugins.torrent.TorrentPlugin.search`.
"""
# pylint: disable=too-few-public-methods
class Meta(Schema.Meta):
"""
Schema metadata.
"""
missing = EXCLUDE
ordered = True
unknown = INCLUDE
title = StrippedString(
required=True,
metadata={
'description': 'Title of the torrent',
'example': '2001: A Space Odyssey',
},
)
file = StrippedString(
metadata={
'description': (
'File name of the torrent, if the resource is a local .torrent file'
),
'example': '/home/user/downloads/2001_a_space_odyssey.torrent',
},
)
url = StrippedString(
metadata={
'description': 'URL of the torrent, if the resource is a remote torrent',
'example': 'magnet:?xt=urn:btih:...',
},
)
provider = StrippedString(
metadata={
'description': (
'Provider of the torrent - e.g. `popcorntime`, `yts` or `torrent-csv`'
),
'example': 'popcorntime',
},
)
type = StrippedString(
metadata={
'description': 'Type of the torrent',
'example': 'movies',
},
)
size = fields.Integer(
missing=0,
metadata={
'description': 'Size of the torrent in bytes',
'example': 123456789,
},
)
duration = fields.Float(
metadata={
'description': 'Duration of the torrent in seconds, if applicable',
'example': 123.45,
},
)
language = StrippedString(
metadata={
'description': 'Language of the torrent',
'example': 'en',
},
)
seeds = fields.Integer(
missing=0,
metadata={
'description': 'Number of seeders',
'example': 123,
},
)
peers = fields.Integer(
missing=0,
metadata={
'description': 'Number of peers',
'example': 123,
},
)
image = StrippedString(
metadata={
'description': 'URL of the image associated to the torrent',
'example': 'https://example.com/image.jpg',
},
)
description = StrippedString(
metadata={
'description': 'Description of the torrent',
'example': 'A description of the torrent',
},
)
imdb_id = StrippedString(
metadata={
'description': 'IMDb ID of the torrent, if applicable',
'example': 'tt0062622',
},
)
tvdb_id = StrippedString(
metadata={
'description': 'TVDB ID of the torrent, if applicable',
'example': '76283',
},
)
year = fields.Integer(
metadata={
'description': 'Year of the release of the underlying product, if applicable',
'example': 1968,
},
)
created_at = DateTime(
metadata={
'description': 'Creation date of the torrent',
'example': '2021-01-01T00:00:00',
},
)
quality = StrippedString(
metadata={
'description': 'Quality of the torrent, if video/audio',
'example': '1080p',
},
)
overview = StrippedString(
metadata={
'description': 'Overview of the torrent, if it is a TV show',
'example': 'Overview of the torrent, if it is a TV show',
},
)
trailer = fields.URL(
metadata={
'description': 'URL of the trailer of the torrent, if available',
'example': 'https://example.com/trailer.mp4',
},
)
genres = fields.List(
fields.String(),
metadata={
'description': 'List of genres associated to the torrent',
'example': ['Sci-Fi', 'Adventure'],
},
)
rating = fields.Float(
metadata={
'description': (
'Rating of the torrent or the underlying product, as a '
'percentage between 0 and 100',
),
'example': 86.0,
},
)
critic_rating = fields.Float(
metadata={
'description': 'Critic rating, if applicable, as a percentage between 0 and 100',
'example': 86.0,
},
)
community_rating = fields.Float(
metadata={
'description': (
'Community rating, if applicable, as a percentage between 0 and 100'
),
'example': 86.0,
},
)
votes = fields.Integer(
metadata={
'description': 'Number of votes, if applicable',
'example': 123,
},
)
series = StrippedString(
metadata={
'description': 'Title of the TV series, if applicable',
'example': 'Breaking Bad',
},
)
season = fields.Integer(
metadata={
'description': 'Season number of the TV series, if applicable',
'example': 1,
},
)
episode = fields.Integer(
metadata={
'description': 'Episode number of the TV series, if applicable',
'example': 1,
},
)
num_seasons = fields.Integer(
metadata={
'description': 'Number of seasons of the TV series, if applicable',
'example': 5,
},
)
country = StrippedString(
metadata={
'description': 'Country of origin of the torrent or the underlying product',
'example': 'USA',
},
)
network = StrippedString(
metadata={
'description': 'Network of the TV series, if applicable',
'example': 'AMC',
},
)
series_status = StrippedString(
metadata={
'description': 'Status of the TV series, if applicable',
'example': 'Ended',
},
)