From 277d6ec271757ff87eefca046db94bf4a4c601a3 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 1 Jul 2019 19:32:22 +0200 Subject: [PATCH] Added torrent support in new webplayer --- .../css/source/common/elements/slider.scss | 5 - .../source/webpanel/plugins/media/index.scss | 6 + .../source/webpanel/plugins/media/info.scss | 3 + .../webpanel/plugins/media/torrents.scss | 40 +++ .../source/webpanel/plugins/media/vars.scss | 2 + platypush/backend/http/static/js/api.js | 8 +- .../static/js/plugins/media/handlers/file.js | 6 +- .../js/plugins/media/handlers/torrent.js | 208 ++++++++++++- .../js/plugins/media/handlers/youtube.js | 2 +- .../http/static/js/plugins/media/index.js | 23 +- .../http/static/js/plugins/media/torrents.js | 68 ++++ .../backend/http/templates/notifications.html | 2 + .../http/templates/plugins/media/index.html | 19 +- .../http/templates/plugins/media/info.html | 21 +- .../templates/plugins/media/torrents.html | 31 ++ platypush/bus/__init__.py | 13 +- platypush/message/event/torrent.py | 60 ++-- platypush/plugins/media/__init__.py | 43 ++- platypush/plugins/media/mplayer.py | 5 - platypush/plugins/media/mpv.py | 40 ++- platypush/plugins/media/vlc.py | 4 - platypush/plugins/media/webtorrent.py | 4 +- platypush/plugins/torrent.py | 292 ++++++++++++------ 23 files changed, 695 insertions(+), 210 deletions(-) create mode 100644 platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss create mode 100644 platypush/backend/http/static/js/plugins/media/torrents.js create mode 100644 platypush/backend/http/templates/plugins/media/torrents.html diff --git a/platypush/backend/http/static/css/source/common/elements/slider.scss b/platypush/backend/http/static/css/source/common/elements/slider.scss index efa4cd6d7..d19c640dc 100644 --- a/platypush/backend/http/static/css/source/common/elements/slider.scss +++ b/platypush/backend/http/static/css/source/common/elements/slider.scss @@ -46,11 +46,6 @@ @include appearance(none); } - &::-webkit-progress-value { - background: $slider-progress-bg; - height: 15px; - } - &::-moz-range-progress { background: $slider-progress-bg; height: 15px; diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss index 58b643721..ce18c35ae 100644 --- a/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss +++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss @@ -10,6 +10,7 @@ @import 'webpanel/plugins/media/controls'; @import 'webpanel/plugins/media/info'; @import 'webpanel/plugins/media/subs'; +@import 'webpanel/plugins/media/torrents'; .media-plugin { display: flex; @@ -50,5 +51,10 @@ color: $result-item-icon; margin-right: .5em; } + + .top-buttons { + text-align: right; + float: right; + } } diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss index 620d18418..384cfac67 100644 --- a/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss +++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss @@ -12,6 +12,9 @@ padding: .75em .5em; border-bottom: $default-border-2; + &:nth-child(odd) { background: rgba(255,255,255,0.0); } + &:nth-child(even) { background: $default-bg-3; } + &:hover { background: $hover-bg; border-radius: 1em; diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss new file mode 100644 index 000000000..70387bfb7 --- /dev/null +++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss @@ -0,0 +1,40 @@ +.media-plugin { + #media-torrents { + .modal { + width: 90%; + + .body { + padding: 0; + text-align: center; + + .head { + font-weight: bold; + padding: .5rem 0 2.5rem 0; + background: $torrents-head-bg; + border-bottom: $default-border-3; + } + + .transfers-container { + margin: 0; + width: 100%; + } + + .transfer { + display: flex; + padding: 1.5rem; + cursor: pointer; + + &:nth-child(odd) { background: rgba(255,255,255,0.0); } + &:nth-child(even) { background: $default-bg-3; } + &.selected { background: $selected-bg; } + + &:hover { + background: $hover-bg; + border-radius: .5rem; + } + } + } + } + } +} + diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss index b585011b7..49ab5f2ec 100644 --- a/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss +++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss @@ -17,3 +17,5 @@ $subs-control-height: 4rem; $btn-default-shadow: 2px 2px 2px #ddd; $btn-hover-default-shadow: 3px 3px 3px #ddd; +$torrents-head-bg: #e8e8e8; + diff --git a/platypush/backend/http/static/js/api.js b/platypush/backend/http/static/js/api.js index b8e08c3fd..f1ca39e6e 100644 --- a/platypush/backend/http/static/js/api.js +++ b/platypush/backend/http/static/js/api.js @@ -1,4 +1,4 @@ -function execute(request) { +function execute(request, timeout=30000) { var additionalPayload = {}; if (!('target' in request) || !request['target']) { @@ -15,6 +15,10 @@ function execute(request) { }; } + if (timeout) { + additionalPayload.timeout = timeout; + } + return new Promise((resolve, reject) => { axios.post('/execute', request, additionalPayload) .then((response) => { @@ -42,7 +46,7 @@ function execute(request) { }); } -function request(action, args={}) { +function request(action, args={}, timeout=30000) { return execute({ type: 'request', action: action, diff --git a/platypush/backend/http/static/js/plugins/media/handlers/file.js b/platypush/backend/http/static/js/plugins/media/handlers/file.js index 28aa18486..5cc272eba 100644 --- a/platypush/backend/http/static/js/plugins/media/handlers/file.js +++ b/platypush/backend/http/static/js/plugins/media/handlers/file.js @@ -103,10 +103,10 @@ MediaHandlers.generic = MediaHandlers.file.extend({ }, methods: { - getMetadata: async function(url) { + getMetadata: async function(item) { return { - url: url, - title: url, + url: item.url, + title: item.url, }; }, }, diff --git a/platypush/backend/http/static/js/plugins/media/handlers/torrent.js b/platypush/backend/http/static/js/plugins/media/handlers/torrent.js index 2df004098..7f586e09a 100644 --- a/platypush/backend/http/static/js/plugins/media/handlers/torrent.js +++ b/platypush/backend/http/static/js/plugins/media/handlers/torrent.js @@ -1,4 +1,4 @@ -MediaHandlers.torrent = Vue.extend({ +MediaHandlers.torrent = MediaHandlers.base.extend({ props: { bus: { type: Object }, iconClass: { @@ -31,6 +31,12 @@ MediaHandlers.torrent = Vue.extend({ }, }, + data: function() { + return { + torrentStatus: {}, + }; + }, + methods: { matchesUrl: function(url) { return !!( @@ -40,19 +46,207 @@ MediaHandlers.torrent = Vue.extend({ ); }, - getMetadata: function(url) { - // TODO - return {}; + getMetadata: async function(item, onlyBase=false) { + let status = {}; + + if (!onlyBase) + status = await this.status({url: item.url}); + + let transferInfo = {}; + if (item.url in this.torrentStatus) + transferInfo = this.torrentStatus[item.url]; + + return {...status, ...transferInfo}; }, - play: function(item) { + play: async function(item) { + let status = await this.download(item); + status.waitingPlay = true; + + if (item.url in this.torrentStatus) { + this.firePlay(event); + } }, - download: function(item) { + pause: async function(item) { + let status = await request('torrent.pause', {torrent: item.torrent}); + this.mergeStatus(status); }, - info: function(item) { + remove: async function(item) { + let status = await request('torrent.remove', {torrent: item.torrent}); + this.mergeStatus(status); }, + + status: async function(item) { + if (item) { + return await request('torrent.status', { + torrent: item.url, + }); + } + + return await request('torrent.status'); + }, + + download: async function(item) { + let status = await this.status(item.url); + if (status && Object.keys(status).length) { + createNotification({ + text: 'This torrent is already being downloaded, please play the downloading local media file instead', + image: { + icon: 'download', + }, + }); + + return status; + } + + status = await request( + 'torrent.download', + { + torrent: item.url, + _async: true, + is_media: true, + }, + timeout=120000 // Wait up to two minutes while downloading enough torrent chunks + ); + + this.torrentStatus[item.url] = { + ...item, ...status, + scheduledPlay: false, + torrentState: status.state, + state: 'idle', + }; + + return this.torrentStatus[item.url]; + }, + + onTorrentEvent: function(event) { + this.mergeStatus(event); + }, + + onTorrentQueued: function(event) { + if (!this.mergeStatus(event)) + this.torrentStatus[event.url] = event; + + createNotification({ + text: 'Torrent download queued. Will start playing when enough chunks have been downloaded', + image: { + icon: 'clock', + }, + }); + }, + + onTorrentStart: function(event) { + if (!this.mergeStatus(event)) + return; + + createNotification({ + text: 'Download of '.concat(event.name, ' started'), + image: { + icon: 'download', + }, + }); + }, + + onTorrentStop: function(event) { + if (!this.mergeStatus(event)) + return; + + delete this.torrentStatus[event.url]; + this.bus.$emit('torrent-status-update', this.torrentStatus); + }, + + onTorrentProgress: function(event) { + if (!this.mergeStatus(event)) + return; + + if (this.torrentStatus[event.url].waitingPlay) + this.firePlay(event); + }, + + onTorrentCompleted: function(event) { + if (!this.mergeStatus(event)) + return; + + delete this.torrentStatus[event.url]; + this.bus.$emit('torrent-status-update', this.torrentStatus); + + createNotification({ + text: 'Download of '.concat(event.name, ' completed'), + image: { + icon: 'check', + }, + }); + }, + + firePlay: function(item) { + if (!item.files || !item.files.length) { + console.warn('Torrent ' + item.url + ' has no media files available yet'); + return; + } + + if (event.progress < 5) { + console.warn('Please wait for enough chunks to be downloaded before playing'); + return; + } + + const url = 'file://' + item.files[0]; + this.bus.$emit('play', {...item, type: 'file', url: url}); + + if (this.torrentStatus[item.url].waitingPlay) + this.torrentStatus[item.url].waitingPlay = false; + + createNotification({ + text: 'Playback of '.concat(item.name, ' started'), + image: { + icon: 'play', + }, + }); + }, + + mergeStatus: function(event) { + const torrentState = event.state; + delete event.state; + + this.torrentStatus[event.url] = { + ...this.torrentStatus[event.url], + ...event, + torrentState: torrentState, + }; + + this.bus.$emit('torrent-status-update', this.torrentStatus); + return this.torrentStatus[event.url]; + }, + }, + + created: function() { + registerEventHandler(this.onTorrentStart, 'platypush.message.event.torrent.TorrentDownloadStartEvent'); + registerEventHandler(this.onTorrentStop, 'platypush.message.event.torrent.TorrentDownloadStopEvent'); + registerEventHandler(this.onTorrentProgress, 'platypush.message.event.torrent.TorrentDownloadProgressEvent'); + registerEventHandler(this.onTorrentCompleted, 'platypush.message.event.torrent.TorrentDownloadCompletedEvent'); + registerEventHandler(this.onTorrentQueued, 'platypush.message.event.torrent.TorrentQueuedEvent'); + registerEventHandler(this.onTorrentEvent, 'platypush.message.event.torrent.TorrentPausedEvent', + 'platypush.message.event.torrent.TorrentResumedEvent', + 'platypush.message.event.torrent.TorrentDownloadStartEvent', + 'platypush.message.event.torrent.TorrentStateChangeEvent'); + + const self = this; + this.status().then((status) => { + if (!status) + return; + + for (const [url, torrent] of Object.entries(status)) { + self.torrentStatus[event.url] + self.mergeStatus(torrent); + } + }); + + setTimeout(() => { + self.bus.$on('torrent-play', self.firePlay); + self.bus.$on('torrent-pause', self.pause); + self.bus.$on('torrent-remove', self.remove); + }, 100); }, }); diff --git a/platypush/backend/http/static/js/plugins/media/handlers/youtube.js b/platypush/backend/http/static/js/plugins/media/handlers/youtube.js index bbf0c0c57..07cd35a22 100644 --- a/platypush/backend/http/static/js/plugins/media/handlers/youtube.js +++ b/platypush/backend/http/static/js/plugins/media/handlers/youtube.js @@ -41,7 +41,7 @@ MediaHandlers.youtube = MediaHandlers.base.extend({ return !!(url.match('^https?://(www\.)?youtube.com/') || url.match('^https?://youtu.be/') || url.match('^https?://.*googlevideo.com/')); }, - getMetadata: function(url) { + getMetadata: function(item) { return {}; }, diff --git a/platypush/backend/http/static/js/plugins/media/index.js b/platypush/backend/http/static/js/plugins/media/index.js index a4d791335..38ed7b48c 100644 --- a/platypush/backend/http/static/js/plugins/media/index.js +++ b/platypush/backend/http/static/js/plugins/media/index.js @@ -61,6 +61,11 @@ Vue.component('media', { item: {}, }, + torrentModal: { + visible: false, + items: {}, + }, + subsModal: { visible: false, }, @@ -71,6 +76,10 @@ Vue.component('media', { types: function() { return MediaHandlers; }, + + torrentsDownloading: function() { + return Object.entries(this.torrentModal.items).length > 0; + }, }, methods: { @@ -154,10 +163,7 @@ Vue.component('media', { }, info: function(item) { - for (const [attr, value] of Object.entries(item)) { - Vue.set(this.infoModal.item, attr, value); - } - + Vue.set(this.infoModal, 'item', item); this.infoModal.loading = false; this.infoModal.visible = true; }, @@ -213,6 +219,14 @@ Vue.component('media', { }; }, + torrentStatusUpdate: function(torrents) { + Vue.set(this.torrentModal, 'items', {}); + + for (const [url, torrent] of Object.entries(torrents)) { + Vue.set(this.torrentModal.items, url, torrent); + } + }, + onStatusUpdate: function(event) { const dev = event.device; const status = event.status; @@ -307,6 +321,7 @@ Vue.component('media', { this.bus.$on('status-update', this.onStatusUpdate); this.bus.$on('start-streaming', this.startStreaming); this.bus.$on('search-subs', this.searchSubs); + this.bus.$on('torrent-status-update', this.torrentStatusUpdate); setInterval(this.timerFunc, 1000); }, diff --git a/platypush/backend/http/static/js/plugins/media/torrents.js b/platypush/backend/http/static/js/plugins/media/torrents.js new file mode 100644 index 000000000..cdeebef00 --- /dev/null +++ b/platypush/backend/http/static/js/plugins/media/torrents.js @@ -0,0 +1,68 @@ +Vue.component('media-torrents', { + template: '#tmpl-media-torrents', + mixins: [mediaUtils], + props: { + bus: { type: Object }, + torrents: { + type: Object, + default: () => {}, + }, + }, + + data: function() { + return { + selectedItem: undefined, + }; + }, + + computed: { + dropdownItems: function() { + const self = this; + return [ + { + name: 'play', + text: 'Play', + iconClass: 'fa fa-play', + click: function() { + self.bus.$emit('torrent-play', self.selectedItem); + }, + }, + + { + name: 'pause', + text: 'Pause/unpause transfer', + iconClass: 'fa fa-pause', + click: function() { + self.bus.$emit('torrent-pause', self.selectedItem); + }, + }, + + { + name: 'cancel', + text: 'Cancel transfer', + iconClass: 'fa fa-trash', + click: function() { + self.bus.$emit('torrent-remove', self.selectedItem); + }, + }, + + { + name: 'info', + text: 'View details', + iconClass: 'fa fa-info', + click: function() { + self.bus.$emit('info', self.selectedItem); + }, + }, + ]; + }, + }, + + methods: { + openDropdown: function(item) { + this.selectedItem = item; + openDropdown(this.$refs.menu); + }, + }, +}); + diff --git a/platypush/backend/http/templates/notifications.html b/platypush/backend/http/templates/notifications.html index 3eeb46c7b..0825ff5ad 100644 --- a/platypush/backend/http/templates/notifications.html +++ b/platypush/backend/http/templates/notifications.html @@ -25,6 +25,8 @@ + diff --git a/platypush/backend/http/templates/plugins/media/index.html b/platypush/backend/http/templates/plugins/media/index.html index 51c759cd3..5c22e0097 100644 --- a/platypush/backend/http/templates/plugins/media/index.html +++ b/platypush/backend/http/templates/plugins/media/index.html @@ -4,6 +4,7 @@ {% include 'plugins/media/item.html' %} {% include 'plugins/media/info.html' %} {% include 'plugins/media/subs.html' %} +{% include 'plugins/media/torrents.html' %} @@ -18,13 +19,20 @@ diff --git a/platypush/backend/http/templates/plugins/media/info.html b/platypush/backend/http/templates/plugins/media/info.html index f98b7a02a..d59b5ef32 100644 --- a/platypush/backend/http/templates/plugins/media/info.html +++ b/platypush/backend/http/templates/plugins/media/info.html @@ -11,12 +11,17 @@
Title
-
+
-
+
Description
-
+
@@ -43,6 +48,16 @@
Size
+ +
+
Peers
+
+
+ +
+
Seeds
+
+
diff --git a/platypush/backend/http/templates/plugins/media/torrents.html b/platypush/backend/http/templates/plugins/media/torrents.html new file mode 100644 index 000000000..90b1e6694 --- /dev/null +++ b/platypush/backend/http/templates/plugins/media/torrents.html @@ -0,0 +1,31 @@ + + + + diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index b053d5674..64269cf55 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -5,7 +5,7 @@ import time from queue import Queue from platypush.config import Config -from platypush.message.event import Event, StopEvent +from platypush.message.event import StopEvent logger = logging.getLogger(__name__) @@ -13,8 +13,7 @@ logger = logging.getLogger(__name__) class Bus(object): """ Main local bus where the daemon will listen for new messages """ - _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired - # after one minute without being picked up + _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired after one minute without being picked up def __init__(self, on_message=None): self.bus = Queue() @@ -56,19 +55,19 @@ class Bus(object): logger.warning('No message handlers installed, cannot poll') return - stop=False + stop = False while not stop: msg = self.get() if msg.timestamp and time.time() - msg.timestamp > self._MSG_EXPIRY_TIMEOUT: - logger.debug('{} seconds old message on the bus expired, ignoring it: {}' - .format(int(time.time()-msg.timestamp), msg)) + logger.debug('{} seconds old message on the bus expired, ignoring it: {}'. + format(int(time.time()-msg.timestamp), msg)) continue threading.Thread(target=self._msg_executor(msg)).start() if isinstance(msg, StopEvent) and msg.targets_me(): logger.info('Received STOP event on the bus') - stop=True + stop = True # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/torrent.py b/platypush/message/event/torrent.py index 43c4fea09..a962fa3b2 100644 --- a/platypush/message/event/torrent.py +++ b/platypush/message/event/torrent.py @@ -5,72 +5,88 @@ class TorrentEvent(Event): """ Base class for torrent events """ - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class TorrentDownloadingMetadataEvent(TorrentEvent): +class TorrentQueuedEvent(TorrentEvent): """ - Event triggered upon torrent metadata download start + Event triggered upon when a new torrent transfer is queued """ + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + +class TorrentDownloadedMetadataEvent(TorrentEvent): + """ + Event triggered upon torrent metadata download completed + """ + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentDownloadStartEvent(TorrentEvent): """ Event triggered upon torrent download start """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentSeedingStartEvent(TorrentEvent): """ Event triggered upon torrent seeding start """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentDownloadProgressEvent(TorrentEvent): """ Event triggered upon torrent download progress """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentStateChangeEvent(TorrentEvent): """ Event triggered upon torrent state change """ + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + +class TorrentPausedEvent(TorrentEvent): + """ + Event triggered when a torrent transfer is paused + """ + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) + + +class TorrentResumedEvent(TorrentEvent): + """ + Event triggered when a torrent transfer is resumed + """ + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentDownloadCompletedEvent(TorrentEvent): """ Event triggered upon torrent state change """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) class TorrentDownloadStopEvent(TorrentEvent): """ Event triggered when a torrent transfer is stopped """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, url, *args, **kwargs): + super().__init__(*args, url=url, **kwargs) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index b871107f0..ea25e345a 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -30,8 +30,6 @@ class MediaPlugin(Plugin): Requires: * A media player installed (supported so far: mplayer, vlc, mpv, omxplayer, chromecast) - * The :class:`platypush.plugins.media.webtorrent` plugin for optional torrent support through webtorrent - (recommended) * **python-libtorrent** (``pip install python-libtorrent``), optional, for torrent support over native library * **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support * **requests** (``pip install requests``), optional, for local files over HTTP streaming supporting @@ -138,9 +136,16 @@ class MediaPlugin(Plugin): os.makedirs(self.download_dir, exist_ok=True) self.media_dirs.add(self.download_dir) - self._is_playing_torrent = False self._videos_queue = [] + @staticmethod + def _torrent_event_handler(evt_queue): + def handler(event): + # More than 5% of the torrent has been downloaded + if event.args.get('progress', 0) > 5 and event.args.get('files'): + evt_queue.put(event.args['files']) + return handler + def _get_resource(self, resource): """ :param resource: Resource to play/parse. Supported types: @@ -159,33 +164,28 @@ class MediaPlugin(Plugin): resource = self.get_youtube_url(resource).output elif resource.startswith('magnet:?'): - try: - get_plugin('media.webtorrent') - return resource # media.webtorrent will handle this - except Exception: - pass - - torrents = get_plugin('torrent') self.logger.info('Downloading torrent {} to {}'.format( resource, self.download_dir)) + torrents = get_plugin('torrent') + + evt_queue = queue.Queue() + torrents.download(resource, download_dir=self.download_dir, _async=True, is_media=True, + event_hndl=self._torrent_event_handler(evt_queue)) + + resources = [f for f in evt_queue.get()] - response = torrents.download(resource, download_dir=self.download_dir) - resources = [f for f in response.output if self._is_video_file(f)] if resources: self._videos_queue = sorted(resources) resource = self._videos_queue.pop(0) else: - raise RuntimeError('Unable to download torrent {}'.format(resource)) + raise RuntimeError('No media file found in torrent {}'.format(resource)) return resource - def _stop_torrent(self): - if self._is_playing_torrent: - try: - get_plugin('media.webtorrent').quit() - except Exception as e: - self.logger.warning('Cannot quit the webtorrent instance: {}'. - format(str(e))) + @staticmethod + def _stop_torrent(): + torrents = get_plugin('torrent') + torrents.quit() @action def play(self, resource, *args, **kwargs): @@ -492,8 +492,7 @@ class MediaPlugin(Plugin): [float(t) * pow(60, i) for (i, t) in enumerate(re.search( '^Duration:\s*([^,]+)', [x.decode() for x in result.stdout.readlines() - if "Duration" in x.decode()] - .pop().strip() + if "Duration" in x.decode()].pop().strip() ).group(1).split(':')[::-1])] ) diff --git a/platypush/plugins/media/mplayer.py b/platypush/plugins/media/mplayer.py index 04b1e695f..149ac3799 100644 --- a/platypush/plugins/media/mplayer.py +++ b/platypush/plugins/media/mplayer.py @@ -61,7 +61,6 @@ class MediaMplayerPlugin(MediaPlugin): self._mplayer_timeout = mplayer_timeout self._mplayer_stopped_event = threading.Event() self._status_lock = threading.Lock() - self._is_playing_torrent = False def _init_mplayer_bin(self, mplayer_bin=None): if not mplayer_bin: @@ -259,11 +258,7 @@ class MediaMplayerPlugin(MediaPlugin): resource = self._get_resource(resource) if resource.startswith('file://'): resource = resource[7:] - elif resource.startswith('magnet:?'): - self._is_playing_torrent = True - return get_plugin('media.webtorrent').play(resource) - self._is_playing_torrent = False self._exec('loadfile', resource, mplayer_args=mplayer_args) self._post_event(MediaPlayEvent, resource=resource) return self.status() diff --git a/platypush/plugins/media/mpv.py b/platypush/plugins/media/mpv.py index 23f4b4d08..be82c1c1b 100644 --- a/platypush/plugins/media/mpv.py +++ b/platypush/plugins/media/mpv.py @@ -2,7 +2,7 @@ import os import re import threading -from platypush.context import get_bus, get_plugin +from platypush.context import get_bus from platypush.plugins.media import PlayerState, MediaPlugin from platypush.message.event.media import MediaPlayEvent, MediaPlayRequestEvent, \ MediaPauseEvent, MediaStopEvent, NewPlayingMediaEvent, MediaSeekEvent @@ -42,7 +42,6 @@ class MediaMpvPlugin(MediaPlugin): self.args.update(args) self._player = None - self._is_playing_torrent = False self._playback_rebounce_event = threading.Event() self._on_stop_callbacks = [] @@ -53,7 +52,7 @@ class MediaMpvPlugin(MediaPlugin): if args: mpv_args.update(args) - for k,v in self._env.items(): + for k, v in self._env.items(): os.environ[k] = v self._player = mpv.MPV(**mpv_args) @@ -77,7 +76,8 @@ class MediaMpvPlugin(MediaPlugin): if (evt == Event.FILE_LOADED or evt == Event.START_FILE) and self._get_current_resource(): self._playback_rebounce_event.set() - self._post_event(NewPlayingMediaEvent, resource=self._get_current_resource(), title=self._player.filename) + self._post_event(NewPlayingMediaEvent, resource=self._get_current_resource(), + title=self._player.filename) elif evt == Event.PLAYBACK_RESTART: self._playback_rebounce_event.set() elif evt == Event.PAUSE: @@ -85,8 +85,8 @@ class MediaMpvPlugin(MediaPlugin): elif evt == Event.UNPAUSE: self._post_event(MediaPlayEvent, resource=self._get_current_resource(), title=self._player.filename) elif evt == Event.SHUTDOWN or ( - evt == Event.END_FILE and event.get('event', {}).get('reason') - in [EndFile.EOF_OR_INIT_FAILURE, EndFile.ABORTED, EndFile.QUIT]): + evt == Event.END_FILE and event.get('event', {}).get('reason') in + [EndFile.EOF_OR_INIT_FAILURE, EndFile.ABORTED, EndFile.QUIT]): playback_rebounced = self._playback_rebounce_event.wait(timeout=0.5) if playback_rebounced: self._playback_rebounce_event.clear() @@ -95,8 +95,8 @@ class MediaMpvPlugin(MediaPlugin): self._player = None self._post_event(MediaStopEvent) - for callback in self._on_stop_callbacks: - callback() + for cbk in self._on_stop_callbacks: + cbk() elif evt == Event.SEEK: self._post_event(MediaSeekEvent, position=self._player.playback_time) @@ -111,7 +111,8 @@ class MediaMpvPlugin(MediaPlugin): for regex in regexes: m = re.search(regex, resource) - if m: return base_url + m.group(2) + if m: + return base_url + m.group(2) return None @action @@ -149,14 +150,11 @@ class MediaMpvPlugin(MediaPlugin): if resource.startswith('file://'): resource = resource[7:] - elif resource.startswith('magnet:?'): - self._is_playing_torrent = True - return get_plugin('media.webtorrent').play(resource) else: yt_resource = self._get_youtube_link(resource) - if yt_resource: resource = yt_resource + if yt_resource: + resource = yt_resource - self._is_playing_torrent = False self._player.play(resource) return self.status() @@ -283,7 +281,7 @@ class MediaMpvPlugin(MediaPlugin): return self._player.sub_remove(sub_id) @action - def toggle_fullscreen(self, fullscreen=None): + def toggle_fullscreen(self): """ Toggle the fullscreen mode """ return self.toggle_property('fullscreen') @@ -296,14 +294,14 @@ class MediaMpvPlugin(MediaPlugin): :param property: Property to toggle """ if not self._player: - return (None, 'No mpv instance is running') + return None, 'No mpv instance is running' if not hasattr(self._player, property): self.logger.warning('No such mpv property: {}'.format(property)) value = not getattr(self._player, property) setattr(self._player, property, value) - return { property: value } + return {property: value} @action def get_property(self, property): @@ -312,7 +310,7 @@ class MediaMpvPlugin(MediaPlugin): ``man mpv`` for a full list of the available properties """ if not self._player: - return (None, 'No mpv instance is running') + return None, 'No mpv instance is running' return getattr(self._player, property) @action @@ -325,7 +323,7 @@ class MediaMpvPlugin(MediaPlugin): :type props: dict """ if not self._player: - return (None, 'No mpv instance is running') + return None, 'No mpv instance is running' for k,v in props.items(): setattr(self._player, k, v) @@ -340,7 +338,7 @@ class MediaMpvPlugin(MediaPlugin): def remove_subtitles(self): """ Removes (hides) the subtitles """ if not self._player: - return (None, 'No mpv instance is running') + return None, 'No mpv instance is running' self._player.sub_visibility = False @action @@ -368,7 +366,7 @@ class MediaMpvPlugin(MediaPlugin): return (None, 'No mpv instance is running') mute = not self._player.mute self._player.mute = mute - return { 'muted': mute } + return {'muted': mute} @action def set_position(self, position): diff --git a/platypush/plugins/media/vlc.py b/platypush/plugins/media/vlc.py index 9043c2b76..cb04be287 100644 --- a/platypush/plugins/media/vlc.py +++ b/platypush/plugins/media/vlc.py @@ -159,9 +159,6 @@ class MediaVlcPlugin(MediaPlugin): if resource.startswith('file://'): resource = resource[len('file://'):] - elif resource.startswith('magnet:?'): - self._is_playing_torrent = True - return get_plugin('media.webtorrent').play(resource) self._init_vlc(resource) if subtitles: @@ -169,7 +166,6 @@ class MediaVlcPlugin(MediaPlugin): subtitles = subtitles[len('file://'):] self._player.video_set_subtitle_file(subtitles) - self._is_playing_torrent = False self._player.play() if fullscreen or self._default_fullscreen: diff --git a/platypush/plugins/media/webtorrent.py b/platypush/plugins/media/webtorrent.py index 12243d779..561b91f38 100644 --- a/platypush/plugins/media/webtorrent.py +++ b/platypush/plugins/media/webtorrent.py @@ -10,7 +10,7 @@ from platypush.config import Config from platypush.context import get_bus, get_plugin from platypush.plugins.media import PlayerState, MediaPlugin from platypush.message.event.torrent import TorrentDownloadStartEvent, \ - TorrentDownloadCompletedEvent, TorrentDownloadingMetadataEvent + TorrentDownloadCompletedEvent, TorrentDownloadedMetadataEvent from platypush.plugins import action from platypush.utils import find_bins_in_path, find_files_by_ext, \ @@ -142,7 +142,7 @@ class MediaWebtorrentPlugin(MediaPlugin): and state == TorrentState.IDLE: # IDLE -> DOWNLOADING_METADATA state = TorrentState.DOWNLOADING_METADATA - bus.post(TorrentDownloadingMetadataEvent(resource=resource)) + bus.post(TorrentDownloadedMetadataEvent(resource=resource)) elif 'downloading: ' in line.lower() \ and media_file is None: # Find video files in torrent directory diff --git a/platypush/plugins/torrent.py b/platypush/plugins/torrent.py index 03f9c7612..f6474d41e 100644 --- a/platypush/plugins/torrent.py +++ b/platypush/plugins/torrent.py @@ -10,9 +10,9 @@ import urllib.parse from platypush.context import get_bus from platypush.plugins import Plugin, action from platypush.message.event.torrent import \ - TorrentDownloadStartEvent, TorrentSeedingStartEvent, \ - TorrentStateChangeEvent, TorrentDownloadProgressEvent, \ - TorrentDownloadCompletedEvent, TorrentDownloadStopEvent + TorrentDownloadStartEvent, TorrentDownloadedMetadataEvent, TorrentStateChangeEvent, \ + TorrentDownloadProgressEvent, TorrentDownloadCompletedEvent, TorrentDownloadStopEvent, \ + TorrentPausedEvent, TorrentResumedEvent, TorrentQueuedEvent class TorrentPlugin(Plugin): @@ -25,6 +25,9 @@ class TorrentPlugin(Plugin): * **requests** (``pip install requests``) [optional] for torrent info URL download """ + # Wait time in seconds between two torrent transfer checks + _MONITOR_CHECK_INTERVAL = 3 + default_torrent_ports = [6881, 6891] supported_categories = { 'movies': 'https://movies-v2.api-fetch.website/movies/1', @@ -48,6 +51,7 @@ class TorrentPlugin(Plugin): self.torrent_ports = torrent_ports if torrent_ports else self.default_torrent_ports self.download_dir = None + self._session = None if download_dir: self.download_dir = os.path.abspath(os.path.expanduser(download_dir)) @@ -101,7 +105,6 @@ class TorrentPlugin(Plugin): format(category, self.supported_categories.keys())) self.logger.info('Searching {} torrents for "{}"'.format(category, query)) - url = 'https://{category}-v2.api-fetch.website/{category}/1'.format(category=category) request = urllib.request.urlopen(urllib.request.Request( self.supported_categories[category] + '?' + urllib.parse.urlencode({ 'sort': 'relevance', @@ -139,41 +142,18 @@ class TorrentPlugin(Plugin): for (quality, item) in items.items() ], key=lambda _: _.get('seeds'), reverse=True) - @action - def download(self, torrent, download_dir=None): - """ - Download a torrent. - - :param torrent: Torrent to download. Supported formats: - - * Magnet URLs - * Torrent URLs - * Local torrent file - - :type resource: str - - :param download_dir: Directory to download, overrides the default download_dir attribute (default: None) - :type download_dir: str - """ - + def _get_torrent_info(self, torrent, download_dir): import libtorrent as lt - if not download_dir: - if self.download_dir: - download_dir = self.download_dir - else: - raise RuntimeError('download_dir not specified') - - download_dir = os.path.abspath(os.path.expanduser(download_dir)) - os.makedirs(download_dir, exist_ok=True) - info = {} torrent_file = None magnet = None - bus = get_bus() + info = {} + file_info = {} if torrent.startswith('magnet:?'): magnet = torrent info = lt.parse_magnet_uri(magnet) + info['magnet'] = magnet elif torrent.startswith('http://') or torrent.startswith('https://'): import requests response = requests.get(torrent, allow_redirects=True) @@ -196,11 +176,139 @@ class TorrentPlugin(Plugin): 'save_path': download_dir, } - ses = lt.session() - ses.listen_on(*self.torrent_ports) + return info, file_info, torrent_file, magnet - self.logger.info('Downloading "{}" to "{}" from [{}]' - .format(info['name'], self.download_dir, torrent)) + def _fire_event(self, event, event_hndl): + bus = get_bus() + bus.post(event) + + try: + if event_hndl: + event_hndl(event) + except Exception as e: + self.logger.warning('Exception in torrent event handler: {}'.format(str(e))) + self.logger.exception(e) + + def _torrent_monitor(self, torrent, transfer, download_dir, event_hndl, is_media): + def thread(): + files = [] + last_status = None + download_started = False + metadata_downloaded = False + + while not transfer.is_finished(): + if torrent not in self.transfers: + self.logger.info('Torrent {} has been stopped and removed'.format(torrent)) + self._fire_event(TorrentDownloadStopEvent(url=torrent), event_hndl) + break + + status = transfer.status() + torrent_file = transfer.torrent_file() + + if torrent_file: + self.torrent_state[torrent]['size'] = torrent_file.total_size() + files = [os.path.join( + download_dir, + torrent_file.files().file_path(i)) + for i in range(0, torrent_file.files().num_files()) + ] + + if is_media: + from platypush.plugins.media import MediaPlugin + files = [f for f in files if MediaPlugin._is_video_file(f)] + + self.torrent_state[torrent]['download_rate'] = status.download_rate + self.torrent_state[torrent]['name'] = status.name + self.torrent_state[torrent]['num_peers'] = status.num_peers + self.torrent_state[torrent]['paused'] = status.paused + self.torrent_state[torrent]['progress'] = round(100 * status.progress, 2) + self.torrent_state[torrent]['state'] = status.state.name + self.torrent_state[torrent]['title'] = status.name + self.torrent_state[torrent]['torrent'] = torrent + self.torrent_state[torrent]['upload_rate'] = status.upload_rate + self.torrent_state[torrent]['url'] = torrent + self.torrent_state[torrent]['files'] = files + + if transfer.has_metadata() and not metadata_downloaded: + self._fire_event(TorrentDownloadedMetadataEvent(**self.torrent_state[torrent]), event_hndl) + metadata_downloaded = True + + if status.state == status.downloading and not download_started: + self._fire_event(TorrentDownloadStartEvent(**self.torrent_state[torrent]), event_hndl) + download_started = True + + if last_status and status.progress != last_status.progress: + self._fire_event(TorrentDownloadProgressEvent(**self.torrent_state[torrent]), event_hndl) + + if not last_status or status.state != last_status.state: + self._fire_event(TorrentStateChangeEvent(**self.torrent_state[torrent]), event_hndl) + + if last_status and status.paused != last_status.paused: + if status.paused: + self._fire_event(TorrentPausedEvent(**self.torrent_state[torrent]), event_hndl) + else: + self._fire_event(TorrentResumedEvent(**self.torrent_state[torrent]), event_hndl) + + last_status = status + time.sleep(self._MONITOR_CHECK_INTERVAL) + + if transfer and transfer.is_finished(): + self._fire_event(TorrentDownloadCompletedEvent(**self.torrent_state[torrent]), event_hndl) + + self.remove(torrent) + return files + + return thread + + @action + def download(self, torrent, download_dir=None, _async=False, event_hndl=None, is_media=False): + """ + Download a torrent. + + :param torrent: Torrent to download. Supported formats: + + * Magnet URLs + * Torrent URLs + * Local torrent file + + :type torrent: str + + :param download_dir: Directory to download, overrides the default download_dir attribute (default: None) + :type download_dir: str + + :param _async: If true then the method will add the torrent to the transfer and then return. Updates on + the download status should be retrieved either by listening to torrent events or registering the + event handler. If false (default) then the method will exit only when the torrent download is complete. + :type _async: bool + + :param event_hndl: A function that takes an event object as argument and is invoked upon a new torrent event + (download started, progressing, completed etc.) + :type event_hndl: function + + :param is_media: Set it to true if you're downloading a media file that you'd like to stream as soon as the + first chunks are available. If so, then the events and the status method will only include media files + :type is_media: bool + """ + + if torrent in self.torrent_state and torrent in self.transfers: + return self.torrent_state[torrent] + + import libtorrent as lt + + if not download_dir: + if self.download_dir: + download_dir = self.download_dir + else: + raise RuntimeError('download_dir not specified') + + download_dir = os.path.abspath(os.path.expanduser(download_dir)) + os.makedirs(download_dir, exist_ok=True) + info, file_info, torrent_file, magnet = self._get_torrent_info(torrent, download_dir) + + if not self._session: + self._session = lt.session() + + self._session.listen_on(*self.torrent_ports) params = { 'save_path': download_dir, @@ -208,98 +316,64 @@ class TorrentPlugin(Plugin): } if magnet: - transfer = lt.add_magnet_uri(ses, magnet, params) - elif torrent_file: + transfer = lt.add_magnet_uri(self._session, magnet, params) + else: params['ti'] = file_info - transfer = ses.add_torrent(params) - - status = transfer.status() - files = [] + transfer = self._session.add_torrent(params) self.transfers[torrent] = transfer self.torrent_state[torrent] = { 'url': torrent, - 'title': info['name'], + 'title': transfer.status().name, 'trackers': info['trackers'], 'save_path': download_dir, } - bus.post(TorrentDownloadStartEvent(**self.torrent_state[torrent])) - last_status = None + self._fire_event(TorrentQueuedEvent(url=torrent), event_hndl) + self.logger.info('Downloading "{}" to "{}" from [{}]'.format(info['name'], download_dir, torrent)) + monitor_thread = self._torrent_monitor(torrent=torrent, transfer=transfer, download_dir=download_dir, + event_hndl=event_hndl, is_media=is_media) - while not status.is_seeding: - if not torrent in self.transfers: - self.logger.info('Torrent {} has been stopped and removed') - bus.post(TorrentDownloadStopEvent(url=torrent)) - return - - if not last_status: - bus.post(TorrentSeedingStartEvent(**self.torrent_state[torrent])) - - status = transfer.status() - torrent_file = transfer.torrent_file() - if torrent_file: - files = [os.path.join( - self.download_dir, - torrent_file.files().file_path(i)) - for i in range(0, torrent_file.files().num_files()) - ] - - self.torrent_state[torrent]['progress'] = round(100 * status.progress, 2) - self.torrent_state[torrent]['download_rate'] = status.download_rate - self.torrent_state[torrent]['upload_rate'] = status.upload_rate - self.torrent_state[torrent]['num_peers'] = status.num_peers - self.torrent_state[torrent]['state'] = status.state - - if last_status and status.progress != last_status.progress: - bus.post(TorrentDownloadProgressEvent(**self.torrent_state[torrent])) - - if not last_status or status.state != last_status.state: - bus.post(TorrentStateChangeEvent(**self.torrent_state[torrent])) - - self.logger.info(('Torrent download: {:.2f}% complete (down: {:.1f} kb/s ' + - 'up: {:.1f} kB/s peers: {} state: {})') - .format(status.progress * 100, - status.download_rate / 1000, - status.upload_rate / 1000, - status.num_peers, status.state)) - - last_status = status - time.sleep(5) - - if torrent_file: - try: os.unlink(torrent_file) - except: pass - - bus.post(TorrentDownloadCompletedEvent(**self.torrent_state[torrent], files=files)) - del self.torrent_state[torrent] - del self.transfers[torrent] - return files + if not _async: + return monitor_thread() + threading.Thread(target=monitor_thread).start() + return self.torrent_state[torrent] @action - def get_status(self): + def status(self, torrent=None): """ Get the status of the current transfers. + :param torrent: Torrent path, URL or magnet URI whose status will be retrieve (default: None, retrieve all + current transfers) + :type torrent: str + :returns: A dictionary in the format torrent_url -> status """ + if torrent: + return self.torrent_state.get(torrent) return self.torrent_state @action def pause(self, torrent): """ - Pause a torrent transfer. + Pause/resume a torrent transfer. :param torrent: Torrent URL as returned from `get_status()` :type torrent: str """ if torrent not in self.transfers: - return (None, "No transfer in progress for {}".format(torrent)) + return None, "No transfer in progress for {}".format(torrent) - self.transfers[torrent].pause() + if self.torrent_state[torrent].get('paused', False): + self.transfers[torrent].resume() + else: + self.transfers[torrent].pause() + + return self.torrent_state[torrent] @action def resume(self, torrent): @@ -311,7 +385,7 @@ class TorrentPlugin(Plugin): """ if torrent not in self.transfers: - return (None, "No transfer in progress for {}".format(torrent)) + return None, "No transfer in progress for {}".format(torrent) self.transfers[torrent].resume() @@ -325,17 +399,35 @@ class TorrentPlugin(Plugin): """ if torrent not in self.transfers: - return (None, "No transfer in progress for {}".format(torrent)) + return None, "No transfer in progress for {}".format(torrent) self.transfers[torrent].pause() del self.torrent_state[torrent] del self.transfers[torrent] - def _generate_rand_filename(self, length=16): + if not len(self.transfers): + self.logger.info('No remaining active torrent transfers found, exiting session') + self._session = None + + @action + def quit(self): + """ + Quits all the transfers and the active session + """ + if not self._session: + self.logger.info('No active sessions found') + return + + transfers = self.transfers.copy() + for torrent in transfers: + self.remove(torrent) + + @staticmethod + def _generate_rand_filename(length=16): name = '' for i in range(0, length): name += hex(random.randint(0, 15))[2:].upper() return name + '.torrent' -# vim:sw=4:ts=4:et: +# vim:sw=4:ts=4:et: