forked from platypush/platypush
Added rtorrent integration [closes #120]
This commit is contained in:
parent
d7806757c5
commit
907bc0f75b
7 changed files with 607 additions and 28 deletions
|
@ -46,6 +46,19 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
);
|
||||
},
|
||||
|
||||
getTorrentPlugin: async function() {
|
||||
if (this.config && this.config.torrent_plugin) {
|
||||
return this.config.torrent_plugin;
|
||||
}
|
||||
|
||||
const config = await request('inspect.get_config');
|
||||
if ('rtorrent' in config)
|
||||
return 'rtorrent';
|
||||
if ('webtorrent' in config)
|
||||
return 'webtorrent';
|
||||
return 'torrent'
|
||||
},
|
||||
|
||||
getMetadata: async function(item, onlyBase=false) {
|
||||
let status = {};
|
||||
|
||||
|
@ -62,34 +75,62 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
play: async function(item) {
|
||||
let status = await this.download(item);
|
||||
status.waitingPlay = true;
|
||||
const torrentId = this.getTorrentUrlOrHash(item.url);
|
||||
|
||||
if (item.url in this.torrentStatus) {
|
||||
if (torrentId in this.torrentStatus) {
|
||||
this.firePlay(event);
|
||||
}
|
||||
},
|
||||
|
||||
pause: async function(item) {
|
||||
let status = await request('torrent.pause', {torrent: item.torrent});
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
const torrentId = this.getTorrentUrlOrHash(item.url);
|
||||
let status = {};
|
||||
|
||||
if (item.paused) {
|
||||
status = await request(torrentPlugin + '.resume', {torrent: torrentId});
|
||||
} else {
|
||||
status = await request(torrentPlugin + '.pause', {torrent: torrentId});
|
||||
}
|
||||
|
||||
this.mergeStatus(status);
|
||||
},
|
||||
|
||||
remove: async function(item) {
|
||||
let status = await request('torrent.remove', {torrent: item.torrent});
|
||||
this.mergeStatus(status);
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
const torrentId = this.getTorrentUrlOrHash(item.url);
|
||||
let status = await request(torrentPlugin + '.remove', {torrent: torrentId});
|
||||
if (torrentId in this.torrentStatus)
|
||||
delete this.torrentStatus[torrentId];
|
||||
},
|
||||
|
||||
status: async function(item) {
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
if (item) {
|
||||
return await request('torrent.status', {
|
||||
torrent: item.url,
|
||||
const torrentId = this.getTorrentUrlOrHash(typeof item === 'string' ? item : item.url);
|
||||
return await request(torrentPlugin + '.status', {
|
||||
torrent: torrentId,
|
||||
});
|
||||
}
|
||||
|
||||
return await request('torrent.status');
|
||||
return await request(torrentPlugin + '.status');
|
||||
},
|
||||
|
||||
getTorrentUrlOrHash: function(torrent) {
|
||||
if (torrent.startsWith('magnet:?')) {
|
||||
m = torrent.match(/xt=urn:btih:([^&/]+)/, torrent)
|
||||
if (m) {
|
||||
return m[1]; // Torrent hash
|
||||
}
|
||||
}
|
||||
|
||||
return torrent;
|
||||
},
|
||||
|
||||
download: async function(item) {
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
let status = await this.status(item.url);
|
||||
|
||||
if (status && Object.keys(status).length > 1) {
|
||||
createNotification({
|
||||
text: 'This torrent is already being downloaded, please play the downloading local media file instead',
|
||||
|
@ -102,7 +143,7 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
}
|
||||
|
||||
status = await request(
|
||||
'torrent.download',
|
||||
torrentPlugin + '.download',
|
||||
{
|
||||
torrent: item.url,
|
||||
_async: true,
|
||||
|
@ -111,14 +152,15 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
timeout=120000 // Wait up to two minutes while downloading enough torrent chunks
|
||||
);
|
||||
|
||||
this.torrentStatus[item.url] = {
|
||||
const torrentId = this.getTorrentUrlOrHash(item.url);
|
||||
this.torrentStatus[torrentId] = {
|
||||
...item, ...status,
|
||||
scheduledPlay: false,
|
||||
torrentState: status.state,
|
||||
state: 'idle',
|
||||
};
|
||||
|
||||
return this.torrentStatus[item.url];
|
||||
return this.torrentStatus[torrentId];
|
||||
},
|
||||
|
||||
onTorrentEvent: function(event) {
|
||||
|
@ -153,7 +195,10 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
if (!this.mergeStatus(event))
|
||||
return;
|
||||
|
||||
delete this.torrentStatus[event.url];
|
||||
const torrentId = this.getTorrentUrlOrHash(event.url);
|
||||
if (torrentId in this.torrentStatus)
|
||||
delete this.torrentStatus[torrentId];
|
||||
|
||||
this.bus.$emit('torrent-status-update', this.torrentStatus);
|
||||
},
|
||||
|
||||
|
@ -161,7 +206,8 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
if (!this.mergeStatus(event))
|
||||
return;
|
||||
|
||||
if (this.torrentStatus[event.url].waitingPlay)
|
||||
const torrentId = this.getTorrentUrlOrHash(event.url);
|
||||
if (this.torrentStatus[torrentId].waitingPlay)
|
||||
this.firePlay(event);
|
||||
},
|
||||
|
||||
|
@ -169,7 +215,10 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
if (!this.mergeStatus(event))
|
||||
return;
|
||||
|
||||
delete this.torrentStatus[event.url];
|
||||
const torrentId = this.getTorrentUrlOrHash(event.url);
|
||||
if (torrentId in this.torrentStatus)
|
||||
delete this.torrentStatus[event.url];
|
||||
|
||||
this.bus.$emit('torrent-status-update', this.torrentStatus);
|
||||
|
||||
createNotification({
|
||||
|
@ -206,26 +255,28 @@ MediaHandlers.torrent = MediaHandlers.base.extend({
|
|||
},
|
||||
|
||||
mergeStatus: function(event) {
|
||||
const torrentId = this.getTorrentUrlOrHash(event.url);
|
||||
const torrentState = event.state;
|
||||
delete event.state;
|
||||
|
||||
this.torrentStatus[event.url] = {
|
||||
...this.torrentStatus[event.url],
|
||||
this.torrentStatus[torrentId] = {
|
||||
...this.torrentStatus[torrentId],
|
||||
...event,
|
||||
torrentState: torrentState,
|
||||
};
|
||||
|
||||
this.bus.$emit('torrent-status-update', this.torrentStatus);
|
||||
return this.torrentStatus[event.url];
|
||||
return this.torrentStatus[torrentId];
|
||||
},
|
||||
},
|
||||
|
||||
created: function() {
|
||||
registerEventHandler(this.onTorrentStart, 'platypush.message.event.torrent.TorrentDownloadStartEvent');
|
||||
registerEventHandler(this.onTorrentStop, 'platypush.message.event.torrent.TorrentDownloadStopEvent');
|
||||
registerEventHandler(this.onTorrentProgress, 'platypush.message.event.torrent.TorrentDownloadProgressEvent');
|
||||
registerEventHandler(this.onTorrentCompleted, 'platypush.message.event.torrent.TorrentDownloadCompletedEvent');
|
||||
registerEventHandler(this.onTorrentQueued, 'platypush.message.event.torrent.TorrentQueuedEvent');
|
||||
registerEventHandler(this.onTorrentStop, 'platypush.message.event.torrent.TorrentDownloadStopEvent',
|
||||
'platypush.message.event.torrent.TorrentRemovedEvent');
|
||||
registerEventHandler(this.onTorrentEvent, 'platypush.message.event.torrent.TorrentPausedEvent',
|
||||
'platypush.message.event.torrent.TorrentResumedEvent',
|
||||
'platypush.message.event.torrent.TorrentDownloadStartEvent',
|
||||
|
|
|
@ -219,6 +219,19 @@ Vue.component('media', {
|
|||
};
|
||||
},
|
||||
|
||||
getTorrentPlugin: async function() {
|
||||
if (this.config && this.config.torrent_plugin) {
|
||||
return this.config.torrent_plugin;
|
||||
}
|
||||
|
||||
const config = await request('inspect.get_config');
|
||||
if ('rtorrent' in config)
|
||||
return 'rtorrent';
|
||||
if ('webtorrent' in config)
|
||||
return 'webtorrent';
|
||||
return 'torrent'
|
||||
},
|
||||
|
||||
torrentStatusUpdate: function(torrents) {
|
||||
Vue.set(this.torrentModal, 'items', {});
|
||||
|
||||
|
@ -227,6 +240,12 @@ Vue.component('media', {
|
|||
}
|
||||
},
|
||||
|
||||
refreshTorrents: async function() {
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
const torrents = await request(torrentPlugin + '.status');
|
||||
this.torrentStatusUpdate(torrents);
|
||||
},
|
||||
|
||||
onStatusUpdate: function(event) {
|
||||
const dev = event.device;
|
||||
const status = event.status;
|
||||
|
@ -301,6 +320,7 @@ Vue.component('media', {
|
|||
MediaHandlers[type].bus = this.bus;
|
||||
}
|
||||
|
||||
this.refreshTorrents();
|
||||
registerEventHandler(this.onMediaEvent,
|
||||
'platypush.message.event.media.NewPlayingMediaEvent',
|
||||
'platypush.message.event.media.MediaPlayEvent',
|
||||
|
|
|
@ -59,6 +59,19 @@ Vue.component('media-torrents', {
|
|||
},
|
||||
|
||||
methods: {
|
||||
getTorrentPlugin: async function() {
|
||||
if (this.config && this.config.torrent_plugin) {
|
||||
return this.config.torrent_plugin;
|
||||
}
|
||||
|
||||
const config = await request('inspect.get_config');
|
||||
if ('rtorrent' in config)
|
||||
return 'rtorrent';
|
||||
if ('webtorrent' in config)
|
||||
return 'webtorrent';
|
||||
return 'torrent'
|
||||
},
|
||||
|
||||
openDropdown: function(item) {
|
||||
this.selectedItem = item;
|
||||
openDropdown(this.$refs.menu);
|
||||
|
@ -69,7 +82,8 @@ Vue.component('media-torrents', {
|
|||
if (!magnet.length)
|
||||
return;
|
||||
|
||||
const ret = await request('torrent.download', {
|
||||
const torrentPlugin = await this.getTorrentPlugin();
|
||||
await request(torrentPlugin + '.download', {
|
||||
torrent: magnet,
|
||||
_async: true,
|
||||
});
|
||||
|
|
|
@ -89,4 +89,12 @@ class TorrentDownloadStopEvent(TorrentEvent):
|
|||
super().__init__(*args, url=url, **kwargs)
|
||||
|
||||
|
||||
class TorrentRemovedEvent(TorrentEvent):
|
||||
"""
|
||||
Event triggered when a torrent transfer is removed.
|
||||
"""
|
||||
def __init__(self, url, *args, **kwargs):
|
||||
super().__init__(*args, url=url, **kwargs)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
|
@ -30,6 +30,7 @@ class MediaPlugin(Plugin):
|
|||
|
||||
* A media player installed (supported so far: mplayer, vlc, mpv, omxplayer, chromecast)
|
||||
* **python-libtorrent** (``pip install python-libtorrent``), optional, for torrent support over native library
|
||||
* *rtorrent* installed - optional, for torrent support over rtorrent
|
||||
* **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support
|
||||
* **requests** (``pip install requests``), optional, for local files over HTTP streaming supporting
|
||||
* **ffmpeg**,optional, to get media files metadata
|
||||
|
@ -42,9 +43,7 @@ class MediaPlugin(Plugin):
|
|||
# another device)
|
||||
_is_local = True
|
||||
_youtube_fifo = os.path.join(tempfile.gettempdir(), 'youtube_video.sock')
|
||||
|
||||
_NOT_IMPLEMENTED_ERR = NotImplementedError(
|
||||
'This method must be implemented in a derived class')
|
||||
_NOT_IMPLEMENTED_ERR = NotImplementedError('This method must be implemented in a derived class')
|
||||
|
||||
# Supported audio extensions
|
||||
audio_extensions = {
|
||||
|
@ -76,6 +75,7 @@ class MediaPlugin(Plugin):
|
|||
download_dir: Optional[str] = None,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
volume: Optional[Union[float, int]] = None,
|
||||
torrent_plugin: str = 'torrent',
|
||||
*args, **kwargs):
|
||||
"""
|
||||
:param media_dirs: Directories that will be scanned for media files when
|
||||
|
@ -88,6 +88,13 @@ class MediaPlugin(Plugin):
|
|||
player executable (e.g. DISPLAY, XDG_VTNR, PULSE_SINK etc.)
|
||||
|
||||
:param volume: Default volume for the player (default: None, maximum volume).
|
||||
|
||||
:param torrent_plugin: Optional plugin to be used for torrent download. Possible values:
|
||||
|
||||
- ``torrent`` - native ``libtorrent``-based plugin (default)
|
||||
- ``rtorrent`` - torrent support over rtorrent RPC/XML interface (recommended)
|
||||
- ``webtorrent`` - torrent support over webtorrent (unstable)
|
||||
|
||||
"""
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
@ -142,6 +149,7 @@ class MediaPlugin(Plugin):
|
|||
self.volume = volume
|
||||
self._videos_queue = []
|
||||
self._youtube_proc = None
|
||||
self.torrent_plugin = torrent_plugin
|
||||
|
||||
@staticmethod
|
||||
def _torrent_event_handler(evt_queue):
|
||||
|
@ -179,7 +187,7 @@ class MediaPlugin(Plugin):
|
|||
elif resource.startswith('magnet:?'):
|
||||
self.logger.info('Downloading torrent {} to {}'.format(
|
||||
resource, self.download_dir))
|
||||
torrents = get_plugin('torrent')
|
||||
torrents = get_plugin(self.torrent_plugin)
|
||||
|
||||
evt_queue = queue.Queue()
|
||||
torrents.download(resource, download_dir=self.download_dir, _async=True, is_media=True,
|
||||
|
@ -195,9 +203,8 @@ class MediaPlugin(Plugin):
|
|||
|
||||
return resource
|
||||
|
||||
@staticmethod
|
||||
def _stop_torrent():
|
||||
torrents = get_plugin('torrent')
|
||||
def _stop_torrent(self):
|
||||
torrents = get_plugin(self.torrent_plugin)
|
||||
torrents.quit()
|
||||
|
||||
@action
|
||||
|
|
473
platypush/plugins/rtorrent.py
Normal file
473
platypush/plugins/rtorrent.py
Normal file
|
@ -0,0 +1,473 @@
|
|||
import datetime
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
import threading
|
||||
import xmlrpc.client
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from platypush.context import get_bus
|
||||
from platypush.plugins import action
|
||||
from platypush.plugins.torrent import TorrentPlugin
|
||||
from platypush.message.event.torrent import \
|
||||
TorrentDownloadStartEvent, TorrentDownloadedMetadataEvent, TorrentDownloadProgressEvent, \
|
||||
TorrentDownloadCompletedEvent, TorrentPausedEvent, TorrentResumedEvent, TorrentQueuedEvent, TorrentRemovedEvent, \
|
||||
TorrentEvent
|
||||
|
||||
|
||||
class RtorrentPlugin(TorrentPlugin):
|
||||
"""
|
||||
Plugin to interact search, download and manage torrents through RTorrent.
|
||||
The usage of this plugin is advised over :class:`platypush.plugins.torrent.TorrentPlugin`, as RTorrent is a more
|
||||
flexible and optimized solution for downloading and managing torrents compared to the Platypush native plugin.
|
||||
|
||||
Configuration:
|
||||
|
||||
- Install ``rtorrent`` on your system - on Debian/Ubuntu/Raspbian::
|
||||
|
||||
apt-get install rtorrent
|
||||
|
||||
- Configure the ``rtorrent`` XML/RPC interface, usually by adding the following lines to your
|
||||
``~/.rtorrent.rc``:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
# Enable XML/RPC
|
||||
scgi_local = /home/user/.rpc.socket
|
||||
|
||||
- Use a web server to bridge the RPC interface exposed by RTorrent over HTTP. Some configuration examples are
|
||||
available `here <https://github.com/rakshasa/rtorrent/wiki/RPC-Setup-XMLRPC>`_. I usually use ``lighttpd``
|
||||
because it's easy to configure and it comes with a built-in SCGI module. Install the server e.g. using
|
||||
``apt``::
|
||||
|
||||
apt-get install lighttpd
|
||||
|
||||
- Create a base configuration file like this under e.g. ``~/.config/rtorrent/lighttpd.conf``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
### Base configuration
|
||||
server.modules = (
|
||||
"mod_indexfile",
|
||||
"mod_access",
|
||||
"mod_alias",
|
||||
"mod_redirect",
|
||||
)
|
||||
|
||||
# Make sure that all the directories exist.
|
||||
|
||||
# server.document-root isn't really needed, but lighttpd
|
||||
# won't start if it doesn't find a document root.
|
||||
server.document-root = "/home/user/.local/share/rtorrent/html"
|
||||
server.upload-dirs = ( "/home/user/.cache/uploads" )
|
||||
server.errorlog = "/home/user/.local/log/rtorrent/error.log"
|
||||
server.pid-file = "/home/user/.local/run/lighttpd.pid"
|
||||
server.username = "your-user"
|
||||
server.groupname = "your-group"
|
||||
server.port = 5000
|
||||
|
||||
index-file.names = ( "index.html" )
|
||||
|
||||
### Configure the RTorrent XML/RPC endpoint
|
||||
server.modules += ( "mod_scgi" )
|
||||
scgi.server = (
|
||||
# Bind an endpoint called /RPC2 to your local interface
|
||||
"/RPC2" =>
|
||||
( "127.0.0.1" =>
|
||||
(
|
||||
# Read from the RTorrent XML/RPC socket
|
||||
"socket" => "/home/user/.rpc.socket",
|
||||
"check-local" => "disable",
|
||||
"disable-time" => 0, # don't disable scgi if connection fails
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
- Start the HTTP service, and optionally enable it as a system/user service::
|
||||
|
||||
lighttpd -f ~/.config/rtorrent/lighttpd.conf
|
||||
|
||||
- Start RTorrent and check that the XML/RPC interface works::
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
$ xmlrpc localhost:8000 system.listMethods
|
||||
# Should return a list with all the methods exposed by RTorrent.
|
||||
$ xmlrpc localhost:5000 download_list
|
||||
Result:
|
||||
Array of 0 items:
|
||||
|
||||
- It is advised to let the RTorrent instance run in e.g. ``screen`` or ``tmux`` on the server machine - it is
|
||||
more reliable than letting the plugin start/stop the instance, and you have an easy CLI interface to attach
|
||||
to manage/monitor your torrents.
|
||||
|
||||
- In this example, the URL to configure in the plugin would be ``http://localhost:5000/RPC2``.
|
||||
|
||||
Triggers:
|
||||
|
||||
* :class:`platypush.message.event.torrent.TorrentQueuedEvent` when a new torrent transfer is queued.
|
||||
* :class:`platypush.message.event.torrent.TorrentRemovedEvent` when a torrent transfer is removed.
|
||||
* :class:`platypush.message.event.torrent.TorrentDownloadStartEvent` when a torrent transfer starts.
|
||||
* :class:`platypush.message.event.torrent.TorrentDownloadedMetadataEvent` when the metadata of a torrent
|
||||
transfer has been downloaded.
|
||||
* :class:`platypush.message.event.torrent.TorrentDownloadProgressEvent` when a transfer is progressing.
|
||||
* :class:`platypush.message.event.torrent.TorrentPausedEvent` when a transfer is paused.
|
||||
* :class:`platypush.message.event.torrent.TorrentResumedEvent` when a transfer is resumed.
|
||||
* :class:`platypush.message.event.torrent.TorrentDownloadCompletedEvent` when a transfer is completed.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, url: str, poll_seconds: float = 5.0, torrent_files_dir: str = '~/.rtorrent/watch', **kwargs):
|
||||
"""
|
||||
:param url: HTTP URL that exposes the XML/RPC interface of RTorrent (e.g. ``http://localhost:5000/RPC2``).
|
||||
:param poll_seconds: How often the plugin will monitor for changes in the torrent state (default: 5 seconds).
|
||||
:param torrent_files_dir: Directory where torrents and metadata files will be downloaded
|
||||
(default: ``~/.rtorrent/watch``).
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.torrent_files_dir = os.path.abspath(os.path.expanduser(torrent_files_dir))
|
||||
Path(self.torrent_files_dir).mkdir(parents=True, exist_ok=True, mode=0o755)
|
||||
|
||||
self._monitor_stop = threading.Event()
|
||||
self._monitor_thread: Optional[threading.Thread] = None
|
||||
self._last_status = {}
|
||||
self._torrent_urls = {}
|
||||
self._status_lock = threading.RLock()
|
||||
|
||||
self.poll_seconds = poll_seconds
|
||||
self.url = url
|
||||
self.client = xmlrpc.client.Server(self.url)
|
||||
self.methods = set(self._list_methods())
|
||||
self.start_monitor()
|
||||
|
||||
def _get_client(self) -> xmlrpc.client.Server:
|
||||
return xmlrpc.client.Server(self.url)
|
||||
|
||||
def _fire_event(self, event: TorrentEvent, *_, **__):
|
||||
bus = get_bus()
|
||||
bus.post(event)
|
||||
|
||||
def _process_events(self, status: dict, last_status: dict):
|
||||
if not status:
|
||||
self._fire_event(TorrentRemovedEvent(**last_status))
|
||||
return
|
||||
|
||||
if not last_status:
|
||||
self._fire_event(TorrentQueuedEvent(**status))
|
||||
|
||||
progress = status.get('progress', 0)
|
||||
name = status.get('name')
|
||||
start_date = status.get('start_date')
|
||||
finish_date = status.get('finish_date')
|
||||
is_active = status.get('is_active')
|
||||
|
||||
if name and not last_status.get('name'):
|
||||
self._fire_event(TorrentDownloadedMetadataEvent(**status))
|
||||
|
||||
if start_date and not last_status.get('start_date'):
|
||||
self._fire_event(TorrentDownloadStartEvent(**status))
|
||||
|
||||
if is_active and not last_status.get('is_active'):
|
||||
self._fire_event(TorrentResumedEvent(**status))
|
||||
elif not is_active and last_status.get('is_active'):
|
||||
self._fire_event(TorrentPausedEvent(**status))
|
||||
|
||||
if progress > 0:
|
||||
if progress > last_status.get('progress', 0):
|
||||
self._fire_event(TorrentDownloadProgressEvent(**status))
|
||||
|
||||
if finish_date and not last_status.get('finish_date'):
|
||||
self._fire_event(TorrentDownloadCompletedEvent(**status))
|
||||
|
||||
def _torrent_monitor(self, *_, **__):
|
||||
def thread():
|
||||
self.logger.info('Starting torrent monitoring')
|
||||
|
||||
while not self._monitor_stop.is_set():
|
||||
try:
|
||||
# noinspection PyUnresolvedReferences
|
||||
statuses = self.status().output
|
||||
last_statuses = self._last_status.copy()
|
||||
self._last_status = statuses
|
||||
torrent_hashes = set(statuses.keys()).union(last_statuses.keys())
|
||||
|
||||
for torrent_hash in torrent_hashes:
|
||||
self._process_events(statuses.get(torrent_hash, {}), last_statuses.get(torrent_hash, {}))
|
||||
except Exception as e:
|
||||
self.logger.warning('Error while monitoring torrent status')
|
||||
self.logger.exception(e)
|
||||
finally:
|
||||
self._monitor_stop.wait(timeout=self.poll_seconds)
|
||||
|
||||
self.logger.info('Stopped torrent monitoring')
|
||||
|
||||
return thread
|
||||
|
||||
def _multicall(self, *args) -> List[list]:
|
||||
if 'd.multicall2' in self.methods:
|
||||
return self.client.d.multicall2('', *args)
|
||||
if 'd.multicall' in self.methods:
|
||||
return self.client.d.multicall(*args)
|
||||
|
||||
raise AssertionError('No multicall method available on the rtorrent interface')
|
||||
|
||||
@action
|
||||
def start_monitor(self):
|
||||
"""
|
||||
Start monitoring the status of the RTorrent instance.
|
||||
"""
|
||||
if self._monitor_thread and self._monitor_thread.is_alive():
|
||||
self.logger.info('Torrent monitoring already running')
|
||||
return
|
||||
|
||||
self._monitor_stop.clear()
|
||||
self._monitor_thread = threading.Thread(target=self._torrent_monitor())
|
||||
self._monitor_thread.start()
|
||||
|
||||
@action
|
||||
def stop_monitor(self):
|
||||
"""
|
||||
Stop monitoring the status of the RTorrent instance.
|
||||
"""
|
||||
if not (self._monitor_thread and self._monitor_thread.is_alive()):
|
||||
self.logger.info('Torrent monitoring already stopped')
|
||||
else:
|
||||
self._monitor_stop.set()
|
||||
self._monitor_thread.join(timeout=60.0)
|
||||
|
||||
self._monitor_thread = None
|
||||
|
||||
@action
|
||||
def download_torrent_file(self, torrent: str) -> str:
|
||||
"""
|
||||
Download a torrent link to ``torrent_files_dir``.
|
||||
|
||||
:param torrent: Torrent URL, magnet link or local file.
|
||||
:return: Path to the locally downloaded .torrent file.
|
||||
"""
|
||||
if torrent.startswith('magnet:?'):
|
||||
# Magnet link: extract and download
|
||||
m = re.search(r'xt=urn:btih:([^&/]+)', torrent)
|
||||
assert m, 'Invalid magnet link: {}'.format(torrent)
|
||||
torrent_hash = m.group(1)
|
||||
torrent_file = os.path.join(self.torrent_files_dir, '{}.torrent'.format(torrent_hash))
|
||||
|
||||
with open(torrent_file, 'w') as f:
|
||||
f.write('d10:magnet-uri{length}:{info}e'.format(length=len(torrent), info=torrent))
|
||||
|
||||
self._torrent_urls[torrent_hash] = torrent
|
||||
return torrent_file
|
||||
|
||||
if torrent.startswith('http://') or torrent.startswith('https://'):
|
||||
# HTTP resource
|
||||
info = requests.get(torrent).text
|
||||
torrent_file = os.path.join(self.torrent_files_dir, torrent.split('/')[-1])
|
||||
if not torrent_file.endswith('.torrent'):
|
||||
torrent_file += '.torrent'
|
||||
|
||||
with open(torrent_file, 'w') as f:
|
||||
f.write(info)
|
||||
|
||||
self._torrent_urls[torrent_file.split('.')[0]] = torrent
|
||||
return torrent_file
|
||||
|
||||
# Local torrent file
|
||||
torrent_file = os.path.abspath(os.path.expanduser(torrent))
|
||||
assert os.path.isfile(torrent_file), 'No such torrent file: {}'.format(torrent)
|
||||
|
||||
self._torrent_urls[os.path.basename(torrent_file).split('.')[0]] = 'file://' + torrent
|
||||
return torrent_file
|
||||
|
||||
@action
|
||||
def download(self, torrent: str, is_media: bool = False, *_, **__):
|
||||
"""
|
||||
Download a torrent.
|
||||
|
||||
:param torrent: Torrent to download. Supported formats:
|
||||
|
||||
* Magnet URLs
|
||||
* Torrent URLs
|
||||
* Local torrent files
|
||||
|
||||
:param is_media: Set it to true if you're downloading a media file that you'd like to stream as soon as the
|
||||
first chunks are available. If so, then the events and the status method will only include media files
|
||||
:return: The status of the torrent.
|
||||
"""
|
||||
# noinspection PyUnresolvedReferences
|
||||
torrent_file = self.download_torrent_file(torrent).output
|
||||
client = self._get_client()
|
||||
client.load.start('', torrent_file)
|
||||
|
||||
def _list_methods(self) -> List[str]:
|
||||
return self.client.system.listMethods()
|
||||
|
||||
@action
|
||||
def list_methods(self) -> List[str]:
|
||||
"""
|
||||
:return: The list of methods exposed by the RTorrent instance
|
||||
"""
|
||||
return list(self.methods)
|
||||
|
||||
@action
|
||||
def status(self, torrent: str = None) -> dict:
|
||||
"""
|
||||
Get the status of the current transfers.
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
:returns: A dictionary:
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
"HASH1234567890": {
|
||||
"hash": "HASH1234567890",
|
||||
"name": "Your torrent name",
|
||||
"save_path": "/home/user/Downloads/Your torrent name",
|
||||
"is_active": true,
|
||||
"is_open": true,
|
||||
"completed_bytes": 666894336,
|
||||
"download_rate": 451345,
|
||||
"is_multi_file": true,
|
||||
"remaining_bytes": 1482827011,
|
||||
"size_bytes": 2149721347,
|
||||
"load_date": "2020-09-02T18:42:19",
|
||||
"peers": 0,
|
||||
"state": "paused",
|
||||
"start_date": "2020-09-02T18:42:19",
|
||||
"finish_date": null,
|
||||
"upload_rate": 143967,
|
||||
"progress": 31.0,
|
||||
"files": ["list", "of", "downloaded", "files"]
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
attrs = ['hash', 'name', 'save_path', 'is_active', 'is_open', 'completed_bytes', 'download_rate',
|
||||
'is_multi_file', 'remaining_bytes', 'size_bytes', 'load_date', 'peers', 'start_date',
|
||||
'finish_date', 'upload_rate']
|
||||
cmds = ['d.hash=', 'd.name=', 'd.directory=', 'd.is_active=', 'd.is_open=', 'd.completed_bytes=',
|
||||
'd.down.rate=', 'd.is_multi_file=', 'd.left_bytes=', 'd.size_bytes=', 'd.load_date=',
|
||||
'd.peers_connected=', 'd.timestamp.started=', 'd.timestamp.finished=', 'd.up.rate=']
|
||||
|
||||
mappers = {
|
||||
'is_active': lambda v: bool(v),
|
||||
'is_open': lambda v: bool(v),
|
||||
'is_multi_file': lambda v: bool(v),
|
||||
'load_date': lambda v: datetime.datetime.fromtimestamp(v) if v else None,
|
||||
'start_date': lambda v: datetime.datetime.fromtimestamp(v) if v else None,
|
||||
'finish_date': lambda v: datetime.datetime.fromtimestamp(v) if v else None,
|
||||
}
|
||||
|
||||
with self._status_lock:
|
||||
torrents = {
|
||||
info[0]: {
|
||||
attr: mappers[attr](info[i]) if attr in mappers else info[i]
|
||||
for i, attr in enumerate(attrs)
|
||||
}
|
||||
for info in self._multicall('', *cmds)
|
||||
}
|
||||
|
||||
for torrent_id, info in torrents.items():
|
||||
torrents[torrent_id]['progress'] = round(100. * (info['completed_bytes']/info['size_bytes']), 1)
|
||||
torrents[torrent_id]['url'] = self._torrent_urls.get(torrent_id, torrent_id)
|
||||
torrents[torrent_id]['is_paused'] = not info['is_active']
|
||||
torrents[torrent_id]['paused'] = not info['is_active'] # Back compatibility with TorrentPlugin
|
||||
torrents[torrent_id]['size'] = info['size_bytes'] # Back compatibility with TorrentPlugin
|
||||
torrents[torrent_id]['files'] = []
|
||||
|
||||
if not info['is_open']:
|
||||
torrents[torrent_id]['state'] = 'stopped'
|
||||
elif not info['is_active']:
|
||||
torrents[torrent_id]['state'] = 'paused'
|
||||
else:
|
||||
torrents[torrent_id]['state'] = 'downloading'
|
||||
|
||||
if info.get('save_path'):
|
||||
torrents[torrent_id]['files'] = list(str(f) for f in Path(info['save_path']).rglob('*')) \
|
||||
if info.get('is_multi_file') else info['save_path']
|
||||
|
||||
return torrents.get(torrent, {}) if torrent else torrents
|
||||
|
||||
@action
|
||||
def open(self, torrent: str) -> dict:
|
||||
"""
|
||||
Open a loaded torrent transfer.
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
:return: The status of the torrent.
|
||||
"""
|
||||
self.client.d.open(torrent)
|
||||
return self.status(torrent).output
|
||||
|
||||
@action
|
||||
def pause(self, torrent: str) -> dict:
|
||||
"""
|
||||
Pause a torrent transfer.
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
:return: The status of the torrent.
|
||||
"""
|
||||
self.client.d.pause(torrent)
|
||||
return self.status(torrent).output
|
||||
|
||||
@action
|
||||
def resume(self, torrent) -> dict:
|
||||
"""
|
||||
Resume a torrent transfer.
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
:return: The status of the torrent.
|
||||
"""
|
||||
self.client.d.resume(torrent)
|
||||
return self.status(torrent).output
|
||||
|
||||
@action
|
||||
def stop(self, torrent) -> dict:
|
||||
"""
|
||||
Stop a torrent transfer.
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
:return: The status of the torrent.
|
||||
"""
|
||||
self.client.d.stop(torrent)
|
||||
return self.status(torrent).output
|
||||
|
||||
@action
|
||||
def remove(self, torrent):
|
||||
"""
|
||||
Stop and remove a torrent transfer (without removing the downloaded files).
|
||||
|
||||
:param torrent: Torrent hash.
|
||||
"""
|
||||
self.client.d.stop(torrent)
|
||||
self.client.d.erase(torrent)
|
||||
|
||||
@action
|
||||
def quit(self):
|
||||
"""
|
||||
Terminate all the active transfers and quit the monitor.
|
||||
"""
|
||||
# noinspection PyUnresolvedReferences
|
||||
torrents = self.status().output.keys().copy()
|
||||
for torrent in torrents:
|
||||
self.remove(torrent)
|
||||
|
||||
self.stop_monitor()
|
||||
|
||||
@action
|
||||
def execute(self, method: str, *args, **kwargs):
|
||||
"""
|
||||
Execute a raw command over the RTorrent RPC interface.
|
||||
|
||||
:param method: Method name.
|
||||
:param args: Method arguments.
|
||||
:param kwargs: Method keyword-arguments.
|
||||
:return: Anything returned by the RPC method.
|
||||
"""
|
||||
method = getattr(self.client, method)
|
||||
return method(*args, **kwargs)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -29,7 +29,7 @@ class TorrentPlugin(Plugin):
|
|||
categories = {
|
||||
'movies': None,
|
||||
'tv': None,
|
||||
'anime': None,
|
||||
# 'anime': None,
|
||||
}
|
||||
|
||||
torrent_state = {}
|
||||
|
@ -240,8 +240,14 @@ class TorrentPlugin(Plugin):
|
|||
|
||||
if torrent.startswith('magnet:?'):
|
||||
magnet = torrent
|
||||
info = lt.parse_magnet_uri(magnet)
|
||||
info['magnet'] = magnet
|
||||
magnet_info = lt.parse_magnet_uri(magnet)
|
||||
info = {
|
||||
'name': magnet_info.name,
|
||||
'url': magnet,
|
||||
'magnet': magnet,
|
||||
'trackers': magnet_info.trackers,
|
||||
'save_path': download_dir,
|
||||
}
|
||||
elif torrent.startswith('http://') or torrent.startswith('https://'):
|
||||
response = requests.get(torrent, headers=self.headers, allow_redirects=True)
|
||||
torrent_file = os.path.join(download_dir, self._generate_rand_filename())
|
||||
|
|
Loading…
Reference in a new issue