diff --git a/platypush/backend/http/static/css/source/common/elements/slider.scss b/platypush/backend/http/static/css/source/common/elements/slider.scss
index efa4cd6d..d19c640d 100644
--- a/platypush/backend/http/static/css/source/common/elements/slider.scss
+++ b/platypush/backend/http/static/css/source/common/elements/slider.scss
@@ -46,11 +46,6 @@
@include appearance(none);
}
- &::-webkit-progress-value {
- background: $slider-progress-bg;
- height: 15px;
- }
-
&::-moz-range-progress {
background: $slider-progress-bg;
height: 15px;
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss
index 58b64372..ce18c35a 100644
--- a/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/index.scss
@@ -10,6 +10,7 @@
@import 'webpanel/plugins/media/controls';
@import 'webpanel/plugins/media/info';
@import 'webpanel/plugins/media/subs';
+@import 'webpanel/plugins/media/torrents';
.media-plugin {
display: flex;
@@ -50,5 +51,10 @@
color: $result-item-icon;
margin-right: .5em;
}
+
+ .top-buttons {
+ text-align: right;
+ float: right;
+ }
}
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss
index 620d1841..384cfac6 100644
--- a/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/info.scss
@@ -12,6 +12,9 @@
padding: .75em .5em;
border-bottom: $default-border-2;
+ &:nth-child(odd) { background: rgba(255,255,255,0.0); }
+ &:nth-child(even) { background: $default-bg-3; }
+
&:hover {
background: $hover-bg;
border-radius: 1em;
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss
new file mode 100644
index 00000000..70387bfb
--- /dev/null
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/torrents.scss
@@ -0,0 +1,40 @@
+.media-plugin {
+ #media-torrents {
+ .modal {
+ width: 90%;
+
+ .body {
+ padding: 0;
+ text-align: center;
+
+ .head {
+ font-weight: bold;
+ padding: .5rem 0 2.5rem 0;
+ background: $torrents-head-bg;
+ border-bottom: $default-border-3;
+ }
+
+ .transfers-container {
+ margin: 0;
+ width: 100%;
+ }
+
+ .transfer {
+ display: flex;
+ padding: 1.5rem;
+ cursor: pointer;
+
+ &:nth-child(odd) { background: rgba(255,255,255,0.0); }
+ &:nth-child(even) { background: $default-bg-3; }
+ &.selected { background: $selected-bg; }
+
+ &:hover {
+ background: $hover-bg;
+ border-radius: .5rem;
+ }
+ }
+ }
+ }
+ }
+}
+
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss b/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss
index b585011b..49ab5f2e 100644
--- a/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/media/vars.scss
@@ -17,3 +17,5 @@ $subs-control-height: 4rem;
$btn-default-shadow: 2px 2px 2px #ddd;
$btn-hover-default-shadow: 3px 3px 3px #ddd;
+$torrents-head-bg: #e8e8e8;
+
diff --git a/platypush/backend/http/static/js/api.js b/platypush/backend/http/static/js/api.js
index b8e08c3f..f1ca39e6 100644
--- a/platypush/backend/http/static/js/api.js
+++ b/platypush/backend/http/static/js/api.js
@@ -1,4 +1,4 @@
-function execute(request) {
+function execute(request, timeout=30000) {
var additionalPayload = {};
if (!('target' in request) || !request['target']) {
@@ -15,6 +15,10 @@ function execute(request) {
};
}
+ if (timeout) {
+ additionalPayload.timeout = timeout;
+ }
+
return new Promise((resolve, reject) => {
axios.post('/execute', request, additionalPayload)
.then((response) => {
@@ -42,7 +46,7 @@ function execute(request) {
});
}
-function request(action, args={}) {
+function request(action, args={}, timeout=30000) {
return execute({
type: 'request',
action: action,
diff --git a/platypush/backend/http/static/js/plugins/media/handlers/file.js b/platypush/backend/http/static/js/plugins/media/handlers/file.js
index 28aa1848..5cc272eb 100644
--- a/platypush/backend/http/static/js/plugins/media/handlers/file.js
+++ b/platypush/backend/http/static/js/plugins/media/handlers/file.js
@@ -103,10 +103,10 @@ MediaHandlers.generic = MediaHandlers.file.extend({
},
methods: {
- getMetadata: async function(url) {
+ getMetadata: async function(item) {
return {
- url: url,
- title: url,
+ url: item.url,
+ title: item.url,
};
},
},
diff --git a/platypush/backend/http/static/js/plugins/media/handlers/torrent.js b/platypush/backend/http/static/js/plugins/media/handlers/torrent.js
index 2df00409..7f586e09 100644
--- a/platypush/backend/http/static/js/plugins/media/handlers/torrent.js
+++ b/platypush/backend/http/static/js/plugins/media/handlers/torrent.js
@@ -1,4 +1,4 @@
-MediaHandlers.torrent = Vue.extend({
+MediaHandlers.torrent = MediaHandlers.base.extend({
props: {
bus: { type: Object },
iconClass: {
@@ -31,6 +31,12 @@ MediaHandlers.torrent = Vue.extend({
},
},
+ data: function() {
+ return {
+ torrentStatus: {},
+ };
+ },
+
methods: {
matchesUrl: function(url) {
return !!(
@@ -40,19 +46,207 @@ MediaHandlers.torrent = Vue.extend({
);
},
- getMetadata: function(url) {
- // TODO
- return {};
+ getMetadata: async function(item, onlyBase=false) {
+ let status = {};
+
+ if (!onlyBase)
+ status = await this.status({url: item.url});
+
+ let transferInfo = {};
+ if (item.url in this.torrentStatus)
+ transferInfo = this.torrentStatus[item.url];
+
+ return {...status, ...transferInfo};
},
- play: function(item) {
+ play: async function(item) {
+ let status = await this.download(item);
+ status.waitingPlay = true;
+
+ if (item.url in this.torrentStatus) {
+ this.firePlay(event);
+ }
},
- download: function(item) {
+ pause: async function(item) {
+ let status = await request('torrent.pause', {torrent: item.torrent});
+ this.mergeStatus(status);
},
- info: function(item) {
+ remove: async function(item) {
+ let status = await request('torrent.remove', {torrent: item.torrent});
+ this.mergeStatus(status);
},
+
+ status: async function(item) {
+ if (item) {
+ return await request('torrent.status', {
+ torrent: item.url,
+ });
+ }
+
+ return await request('torrent.status');
+ },
+
+ download: async function(item) {
+ let status = await this.status(item.url);
+ if (status && Object.keys(status).length) {
+ createNotification({
+ text: 'This torrent is already being downloaded, please play the downloading local media file instead',
+ image: {
+ icon: 'download',
+ },
+ });
+
+ return status;
+ }
+
+ status = await request(
+ 'torrent.download',
+ {
+ torrent: item.url,
+ _async: true,
+ is_media: true,
+ },
+ timeout=120000 // Wait up to two minutes while downloading enough torrent chunks
+ );
+
+ this.torrentStatus[item.url] = {
+ ...item, ...status,
+ scheduledPlay: false,
+ torrentState: status.state,
+ state: 'idle',
+ };
+
+ return this.torrentStatus[item.url];
+ },
+
+ onTorrentEvent: function(event) {
+ this.mergeStatus(event);
+ },
+
+ onTorrentQueued: function(event) {
+ if (!this.mergeStatus(event))
+ this.torrentStatus[event.url] = event;
+
+ createNotification({
+ text: 'Torrent download queued. Will start playing when enough chunks have been downloaded',
+ image: {
+ icon: 'clock',
+ },
+ });
+ },
+
+ onTorrentStart: function(event) {
+ if (!this.mergeStatus(event))
+ return;
+
+ createNotification({
+ text: 'Download of '.concat(event.name, ' started'),
+ image: {
+ icon: 'download',
+ },
+ });
+ },
+
+ onTorrentStop: function(event) {
+ if (!this.mergeStatus(event))
+ return;
+
+ delete this.torrentStatus[event.url];
+ this.bus.$emit('torrent-status-update', this.torrentStatus);
+ },
+
+ onTorrentProgress: function(event) {
+ if (!this.mergeStatus(event))
+ return;
+
+ if (this.torrentStatus[event.url].waitingPlay)
+ this.firePlay(event);
+ },
+
+ onTorrentCompleted: function(event) {
+ if (!this.mergeStatus(event))
+ return;
+
+ delete this.torrentStatus[event.url];
+ this.bus.$emit('torrent-status-update', this.torrentStatus);
+
+ createNotification({
+ text: 'Download of '.concat(event.name, ' completed'),
+ image: {
+ icon: 'check',
+ },
+ });
+ },
+
+ firePlay: function(item) {
+ if (!item.files || !item.files.length) {
+ console.warn('Torrent ' + item.url + ' has no media files available yet');
+ return;
+ }
+
+ if (event.progress < 5) {
+ console.warn('Please wait for enough chunks to be downloaded before playing');
+ return;
+ }
+
+ const url = 'file://' + item.files[0];
+ this.bus.$emit('play', {...item, type: 'file', url: url});
+
+ if (this.torrentStatus[item.url].waitingPlay)
+ this.torrentStatus[item.url].waitingPlay = false;
+
+ createNotification({
+ text: 'Playback of '.concat(item.name, ' started'),
+ image: {
+ icon: 'play',
+ },
+ });
+ },
+
+ mergeStatus: function(event) {
+ const torrentState = event.state;
+ delete event.state;
+
+ this.torrentStatus[event.url] = {
+ ...this.torrentStatus[event.url],
+ ...event,
+ torrentState: torrentState,
+ };
+
+ this.bus.$emit('torrent-status-update', this.torrentStatus);
+ return this.torrentStatus[event.url];
+ },
+ },
+
+ created: function() {
+ registerEventHandler(this.onTorrentStart, 'platypush.message.event.torrent.TorrentDownloadStartEvent');
+ registerEventHandler(this.onTorrentStop, 'platypush.message.event.torrent.TorrentDownloadStopEvent');
+ registerEventHandler(this.onTorrentProgress, 'platypush.message.event.torrent.TorrentDownloadProgressEvent');
+ registerEventHandler(this.onTorrentCompleted, 'platypush.message.event.torrent.TorrentDownloadCompletedEvent');
+ registerEventHandler(this.onTorrentQueued, 'platypush.message.event.torrent.TorrentQueuedEvent');
+ registerEventHandler(this.onTorrentEvent, 'platypush.message.event.torrent.TorrentPausedEvent',
+ 'platypush.message.event.torrent.TorrentResumedEvent',
+ 'platypush.message.event.torrent.TorrentDownloadStartEvent',
+ 'platypush.message.event.torrent.TorrentStateChangeEvent');
+
+ const self = this;
+ this.status().then((status) => {
+ if (!status)
+ return;
+
+ for (const [url, torrent] of Object.entries(status)) {
+ self.torrentStatus[event.url]
+ self.mergeStatus(torrent);
+ }
+ });
+
+ setTimeout(() => {
+ self.bus.$on('torrent-play', self.firePlay);
+ self.bus.$on('torrent-pause', self.pause);
+ self.bus.$on('torrent-remove', self.remove);
+ }, 100);
},
});
diff --git a/platypush/backend/http/static/js/plugins/media/handlers/youtube.js b/platypush/backend/http/static/js/plugins/media/handlers/youtube.js
index bbf0c0c5..07cd35a2 100644
--- a/platypush/backend/http/static/js/plugins/media/handlers/youtube.js
+++ b/platypush/backend/http/static/js/plugins/media/handlers/youtube.js
@@ -41,7 +41,7 @@ MediaHandlers.youtube = MediaHandlers.base.extend({
return !!(url.match('^https?://(www\.)?youtube.com/') || url.match('^https?://youtu.be/') || url.match('^https?://.*googlevideo.com/'));
},
- getMetadata: function(url) {
+ getMetadata: function(item) {
return {};
},
diff --git a/platypush/backend/http/static/js/plugins/media/index.js b/platypush/backend/http/static/js/plugins/media/index.js
index a4d79133..38ed7b48 100644
--- a/platypush/backend/http/static/js/plugins/media/index.js
+++ b/platypush/backend/http/static/js/plugins/media/index.js
@@ -61,6 +61,11 @@ Vue.component('media', {
item: {},
},
+ torrentModal: {
+ visible: false,
+ items: {},
+ },
+
subsModal: {
visible: false,
},
@@ -71,6 +76,10 @@ Vue.component('media', {
types: function() {
return MediaHandlers;
},
+
+ torrentsDownloading: function() {
+ return Object.entries(this.torrentModal.items).length > 0;
+ },
},
methods: {
@@ -154,10 +163,7 @@ Vue.component('media', {
},
info: function(item) {
- for (const [attr, value] of Object.entries(item)) {
- Vue.set(this.infoModal.item, attr, value);
- }
-
+ Vue.set(this.infoModal, 'item', item);
this.infoModal.loading = false;
this.infoModal.visible = true;
},
@@ -213,6 +219,14 @@ Vue.component('media', {
};
},
+ torrentStatusUpdate: function(torrents) {
+ Vue.set(this.torrentModal, 'items', {});
+
+ for (const [url, torrent] of Object.entries(torrents)) {
+ Vue.set(this.torrentModal.items, url, torrent);
+ }
+ },
+
onStatusUpdate: function(event) {
const dev = event.device;
const status = event.status;
@@ -307,6 +321,7 @@ Vue.component('media', {
this.bus.$on('status-update', this.onStatusUpdate);
this.bus.$on('start-streaming', this.startStreaming);
this.bus.$on('search-subs', this.searchSubs);
+ this.bus.$on('torrent-status-update', this.torrentStatusUpdate);
setInterval(this.timerFunc, 1000);
},
diff --git a/platypush/backend/http/static/js/plugins/media/torrents.js b/platypush/backend/http/static/js/plugins/media/torrents.js
new file mode 100644
index 00000000..cdeebef0
--- /dev/null
+++ b/platypush/backend/http/static/js/plugins/media/torrents.js
@@ -0,0 +1,68 @@
+Vue.component('media-torrents', {
+ template: '#tmpl-media-torrents',
+ mixins: [mediaUtils],
+ props: {
+ bus: { type: Object },
+ torrents: {
+ type: Object,
+ default: () => {},
+ },
+ },
+
+ data: function() {
+ return {
+ selectedItem: undefined,
+ };
+ },
+
+ computed: {
+ dropdownItems: function() {
+ const self = this;
+ return [
+ {
+ name: 'play',
+ text: 'Play',
+ iconClass: 'fa fa-play',
+ click: function() {
+ self.bus.$emit('torrent-play', self.selectedItem);
+ },
+ },
+
+ {
+ name: 'pause',
+ text: 'Pause/unpause transfer',
+ iconClass: 'fa fa-pause',
+ click: function() {
+ self.bus.$emit('torrent-pause', self.selectedItem);
+ },
+ },
+
+ {
+ name: 'cancel',
+ text: 'Cancel transfer',
+ iconClass: 'fa fa-trash',
+ click: function() {
+ self.bus.$emit('torrent-remove', self.selectedItem);
+ },
+ },
+
+ {
+ name: 'info',
+ text: 'View details',
+ iconClass: 'fa fa-info',
+ click: function() {
+ self.bus.$emit('info', self.selectedItem);
+ },
+ },
+ ];
+ },
+ },
+
+ methods: {
+ openDropdown: function(item) {
+ this.selectedItem = item;
+ openDropdown(this.$refs.menu);
+ },
+ },
+});
+
diff --git a/platypush/backend/http/templates/notifications.html b/platypush/backend/http/templates/notifications.html
index 3eeb46c7..0825ff5a 100644
--- a/platypush/backend/http/templates/notifications.html
+++ b/platypush/backend/http/templates/notifications.html
@@ -25,6 +25,8 @@
+
diff --git a/platypush/backend/http/templates/plugins/media/index.html b/platypush/backend/http/templates/plugins/media/index.html
index 51c759cd..5c22e009 100644
--- a/platypush/backend/http/templates/plugins/media/index.html
+++ b/platypush/backend/http/templates/plugins/media/index.html
@@ -4,6 +4,7 @@
{% include 'plugins/media/item.html' %}
{% include 'plugins/media/info.html' %}
{% include 'plugins/media/subs.html' %}
+{% include 'plugins/media/torrents.html' %}
@@ -18,13 +19,20 @@
diff --git a/platypush/backend/http/templates/plugins/media/info.html b/platypush/backend/http/templates/plugins/media/info.html
index f98b7a02..d59b5ef3 100644
--- a/platypush/backend/http/templates/plugins/media/info.html
+++ b/platypush/backend/http/templates/plugins/media/info.html
@@ -11,12 +11,17 @@
-
diff --git a/platypush/backend/http/templates/plugins/media/torrents.html b/platypush/backend/http/templates/plugins/media/torrents.html
new file mode 100644
index 00000000..90b1e669
--- /dev/null
+++ b/platypush/backend/http/templates/plugins/media/torrents.html
@@ -0,0 +1,31 @@
+
+
+
+
diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py
index b053d567..64269cf5 100644
--- a/platypush/bus/__init__.py
+++ b/platypush/bus/__init__.py
@@ -5,7 +5,7 @@ import time
from queue import Queue
from platypush.config import Config
-from platypush.message.event import Event, StopEvent
+from platypush.message.event import StopEvent
logger = logging.getLogger(__name__)
@@ -13,8 +13,7 @@ logger = logging.getLogger(__name__)
class Bus(object):
""" Main local bus where the daemon will listen for new messages """
- _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired
- # after one minute without being picked up
+ _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired after one minute without being picked up
def __init__(self, on_message=None):
self.bus = Queue()
@@ -56,19 +55,19 @@ class Bus(object):
logger.warning('No message handlers installed, cannot poll')
return
- stop=False
+ stop = False
while not stop:
msg = self.get()
if msg.timestamp and time.time() - msg.timestamp > self._MSG_EXPIRY_TIMEOUT:
- logger.debug('{} seconds old message on the bus expired, ignoring it: {}'
- .format(int(time.time()-msg.timestamp), msg))
+ logger.debug('{} seconds old message on the bus expired, ignoring it: {}'.
+ format(int(time.time()-msg.timestamp), msg))
continue
threading.Thread(target=self._msg_executor(msg)).start()
if isinstance(msg, StopEvent) and msg.targets_me():
logger.info('Received STOP event on the bus')
- stop=True
+ stop = True
# vim:sw=4:ts=4:et:
diff --git a/platypush/message/event/torrent.py b/platypush/message/event/torrent.py
index 43c4fea0..a962fa3b 100644
--- a/platypush/message/event/torrent.py
+++ b/platypush/message/event/torrent.py
@@ -5,72 +5,88 @@ class TorrentEvent(Event):
"""
Base class for torrent events
"""
-
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
-class TorrentDownloadingMetadataEvent(TorrentEvent):
+class TorrentQueuedEvent(TorrentEvent):
"""
- Event triggered upon torrent metadata download start
+ Event triggered upon when a new torrent transfer is queued
"""
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+
+class TorrentDownloadedMetadataEvent(TorrentEvent):
+ """
+ Event triggered upon torrent metadata download completed
+ """
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentDownloadStartEvent(TorrentEvent):
"""
Event triggered upon torrent download start
"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentSeedingStartEvent(TorrentEvent):
"""
Event triggered upon torrent seeding start
"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentDownloadProgressEvent(TorrentEvent):
"""
Event triggered upon torrent download progress
"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentStateChangeEvent(TorrentEvent):
"""
Event triggered upon torrent state change
"""
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+
+class TorrentPausedEvent(TorrentEvent):
+ """
+ Event triggered when a torrent transfer is paused
+ """
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
+
+
+class TorrentResumedEvent(TorrentEvent):
+ """
+ Event triggered when a torrent transfer is resumed
+ """
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentDownloadCompletedEvent(TorrentEvent):
"""
Event triggered upon torrent state change
"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
class TorrentDownloadStopEvent(TorrentEvent):
"""
Event triggered when a torrent transfer is stopped
"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
+ def __init__(self, url, *args, **kwargs):
+ super().__init__(*args, url=url, **kwargs)
# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py
index b871107f..ea25e345 100644
--- a/platypush/plugins/media/__init__.py
+++ b/platypush/plugins/media/__init__.py
@@ -30,8 +30,6 @@ class MediaPlugin(Plugin):
Requires:
* A media player installed (supported so far: mplayer, vlc, mpv, omxplayer, chromecast)
- * The :class:`platypush.plugins.media.webtorrent` plugin for optional torrent support through webtorrent
- (recommended)
* **python-libtorrent** (``pip install python-libtorrent``), optional, for torrent support over native library
* **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support
* **requests** (``pip install requests``), optional, for local files over HTTP streaming supporting
@@ -138,9 +136,16 @@ class MediaPlugin(Plugin):
os.makedirs(self.download_dir, exist_ok=True)
self.media_dirs.add(self.download_dir)
- self._is_playing_torrent = False
self._videos_queue = []
+ @staticmethod
+ def _torrent_event_handler(evt_queue):
+ def handler(event):
+ # More than 5% of the torrent has been downloaded
+ if event.args.get('progress', 0) > 5 and event.args.get('files'):
+ evt_queue.put(event.args['files'])
+ return handler
+
def _get_resource(self, resource):
"""
:param resource: Resource to play/parse. Supported types:
@@ -159,33 +164,28 @@ class MediaPlugin(Plugin):
resource = self.get_youtube_url(resource).output
elif resource.startswith('magnet:?'):
- try:
- get_plugin('media.webtorrent')
- return resource # media.webtorrent will handle this
- except Exception:
- pass
-
- torrents = get_plugin('torrent')
self.logger.info('Downloading torrent {} to {}'.format(
resource, self.download_dir))
+ torrents = get_plugin('torrent')
+
+ evt_queue = queue.Queue()
+ torrents.download(resource, download_dir=self.download_dir, _async=True, is_media=True,
+ event_hndl=self._torrent_event_handler(evt_queue))
+
+ resources = [f for f in evt_queue.get()]
- response = torrents.download(resource, download_dir=self.download_dir)
- resources = [f for f in response.output if self._is_video_file(f)]
if resources:
self._videos_queue = sorted(resources)
resource = self._videos_queue.pop(0)
else:
- raise RuntimeError('Unable to download torrent {}'.format(resource))
+ raise RuntimeError('No media file found in torrent {}'.format(resource))
return resource
- def _stop_torrent(self):
- if self._is_playing_torrent:
- try:
- get_plugin('media.webtorrent').quit()
- except Exception as e:
- self.logger.warning('Cannot quit the webtorrent instance: {}'.
- format(str(e)))
+ @staticmethod
+ def _stop_torrent():
+ torrents = get_plugin('torrent')
+ torrents.quit()
@action
def play(self, resource, *args, **kwargs):
@@ -492,8 +492,7 @@ class MediaPlugin(Plugin):
[float(t) * pow(60, i) for (i, t) in enumerate(re.search(
'^Duration:\s*([^,]+)', [x.decode()
for x in result.stdout.readlines()
- if "Duration" in x.decode()]
- .pop().strip()
+ if "Duration" in x.decode()].pop().strip()
).group(1).split(':')[::-1])]
)
diff --git a/platypush/plugins/media/mplayer.py b/platypush/plugins/media/mplayer.py
index 04b1e695..149ac379 100644
--- a/platypush/plugins/media/mplayer.py
+++ b/platypush/plugins/media/mplayer.py
@@ -61,7 +61,6 @@ class MediaMplayerPlugin(MediaPlugin):
self._mplayer_timeout = mplayer_timeout
self._mplayer_stopped_event = threading.Event()
self._status_lock = threading.Lock()
- self._is_playing_torrent = False
def _init_mplayer_bin(self, mplayer_bin=None):
if not mplayer_bin:
@@ -259,11 +258,7 @@ class MediaMplayerPlugin(MediaPlugin):
resource = self._get_resource(resource)
if resource.startswith('file://'):
resource = resource[7:]
- elif resource.startswith('magnet:?'):
- self._is_playing_torrent = True
- return get_plugin('media.webtorrent').play(resource)
- self._is_playing_torrent = False
self._exec('loadfile', resource, mplayer_args=mplayer_args)
self._post_event(MediaPlayEvent, resource=resource)
return self.status()
diff --git a/platypush/plugins/media/mpv.py b/platypush/plugins/media/mpv.py
index 23f4b4d0..be82c1c1 100644
--- a/platypush/plugins/media/mpv.py
+++ b/platypush/plugins/media/mpv.py
@@ -2,7 +2,7 @@ import os
import re
import threading
-from platypush.context import get_bus, get_plugin
+from platypush.context import get_bus
from platypush.plugins.media import PlayerState, MediaPlugin
from platypush.message.event.media import MediaPlayEvent, MediaPlayRequestEvent, \
MediaPauseEvent, MediaStopEvent, NewPlayingMediaEvent, MediaSeekEvent
@@ -42,7 +42,6 @@ class MediaMpvPlugin(MediaPlugin):
self.args.update(args)
self._player = None
- self._is_playing_torrent = False
self._playback_rebounce_event = threading.Event()
self._on_stop_callbacks = []
@@ -53,7 +52,7 @@ class MediaMpvPlugin(MediaPlugin):
if args:
mpv_args.update(args)
- for k,v in self._env.items():
+ for k, v in self._env.items():
os.environ[k] = v
self._player = mpv.MPV(**mpv_args)
@@ -77,7 +76,8 @@ class MediaMpvPlugin(MediaPlugin):
if (evt == Event.FILE_LOADED or evt == Event.START_FILE) and self._get_current_resource():
self._playback_rebounce_event.set()
- self._post_event(NewPlayingMediaEvent, resource=self._get_current_resource(), title=self._player.filename)
+ self._post_event(NewPlayingMediaEvent, resource=self._get_current_resource(),
+ title=self._player.filename)
elif evt == Event.PLAYBACK_RESTART:
self._playback_rebounce_event.set()
elif evt == Event.PAUSE:
@@ -85,8 +85,8 @@ class MediaMpvPlugin(MediaPlugin):
elif evt == Event.UNPAUSE:
self._post_event(MediaPlayEvent, resource=self._get_current_resource(), title=self._player.filename)
elif evt == Event.SHUTDOWN or (
- evt == Event.END_FILE and event.get('event', {}).get('reason')
- in [EndFile.EOF_OR_INIT_FAILURE, EndFile.ABORTED, EndFile.QUIT]):
+ evt == Event.END_FILE and event.get('event', {}).get('reason') in
+ [EndFile.EOF_OR_INIT_FAILURE, EndFile.ABORTED, EndFile.QUIT]):
playback_rebounced = self._playback_rebounce_event.wait(timeout=0.5)
if playback_rebounced:
self._playback_rebounce_event.clear()
@@ -95,8 +95,8 @@ class MediaMpvPlugin(MediaPlugin):
self._player = None
self._post_event(MediaStopEvent)
- for callback in self._on_stop_callbacks:
- callback()
+ for cbk in self._on_stop_callbacks:
+ cbk()
elif evt == Event.SEEK:
self._post_event(MediaSeekEvent, position=self._player.playback_time)
@@ -111,7 +111,8 @@ class MediaMpvPlugin(MediaPlugin):
for regex in regexes:
m = re.search(regex, resource)
- if m: return base_url + m.group(2)
+ if m:
+ return base_url + m.group(2)
return None
@action
@@ -149,14 +150,11 @@ class MediaMpvPlugin(MediaPlugin):
if resource.startswith('file://'):
resource = resource[7:]
- elif resource.startswith('magnet:?'):
- self._is_playing_torrent = True
- return get_plugin('media.webtorrent').play(resource)
else:
yt_resource = self._get_youtube_link(resource)
- if yt_resource: resource = yt_resource
+ if yt_resource:
+ resource = yt_resource
- self._is_playing_torrent = False
self._player.play(resource)
return self.status()
@@ -283,7 +281,7 @@ class MediaMpvPlugin(MediaPlugin):
return self._player.sub_remove(sub_id)
@action
- def toggle_fullscreen(self, fullscreen=None):
+ def toggle_fullscreen(self):
""" Toggle the fullscreen mode """
return self.toggle_property('fullscreen')
@@ -296,14 +294,14 @@ class MediaMpvPlugin(MediaPlugin):
:param property: Property to toggle
"""
if not self._player:
- return (None, 'No mpv instance is running')
+ return None, 'No mpv instance is running'
if not hasattr(self._player, property):
self.logger.warning('No such mpv property: {}'.format(property))
value = not getattr(self._player, property)
setattr(self._player, property, value)
- return { property: value }
+ return {property: value}
@action
def get_property(self, property):
@@ -312,7 +310,7 @@ class MediaMpvPlugin(MediaPlugin):
``man mpv`` for a full list of the available properties
"""
if not self._player:
- return (None, 'No mpv instance is running')
+ return None, 'No mpv instance is running'
return getattr(self._player, property)
@action
@@ -325,7 +323,7 @@ class MediaMpvPlugin(MediaPlugin):
:type props: dict
"""
if not self._player:
- return (None, 'No mpv instance is running')
+ return None, 'No mpv instance is running'
for k,v in props.items():
setattr(self._player, k, v)
@@ -340,7 +338,7 @@ class MediaMpvPlugin(MediaPlugin):
def remove_subtitles(self):
""" Removes (hides) the subtitles """
if not self._player:
- return (None, 'No mpv instance is running')
+ return None, 'No mpv instance is running'
self._player.sub_visibility = False
@action
@@ -368,7 +366,7 @@ class MediaMpvPlugin(MediaPlugin):
return (None, 'No mpv instance is running')
mute = not self._player.mute
self._player.mute = mute
- return { 'muted': mute }
+ return {'muted': mute}
@action
def set_position(self, position):
diff --git a/platypush/plugins/media/vlc.py b/platypush/plugins/media/vlc.py
index 9043c2b7..cb04be28 100644
--- a/platypush/plugins/media/vlc.py
+++ b/platypush/plugins/media/vlc.py
@@ -159,9 +159,6 @@ class MediaVlcPlugin(MediaPlugin):
if resource.startswith('file://'):
resource = resource[len('file://'):]
- elif resource.startswith('magnet:?'):
- self._is_playing_torrent = True
- return get_plugin('media.webtorrent').play(resource)
self._init_vlc(resource)
if subtitles:
@@ -169,7 +166,6 @@ class MediaVlcPlugin(MediaPlugin):
subtitles = subtitles[len('file://'):]
self._player.video_set_subtitle_file(subtitles)
- self._is_playing_torrent = False
self._player.play()
if fullscreen or self._default_fullscreen:
diff --git a/platypush/plugins/media/webtorrent.py b/platypush/plugins/media/webtorrent.py
index 12243d77..561b91f3 100644
--- a/platypush/plugins/media/webtorrent.py
+++ b/platypush/plugins/media/webtorrent.py
@@ -10,7 +10,7 @@ from platypush.config import Config
from platypush.context import get_bus, get_plugin
from platypush.plugins.media import PlayerState, MediaPlugin
from platypush.message.event.torrent import TorrentDownloadStartEvent, \
- TorrentDownloadCompletedEvent, TorrentDownloadingMetadataEvent
+ TorrentDownloadCompletedEvent, TorrentDownloadedMetadataEvent
from platypush.plugins import action
from platypush.utils import find_bins_in_path, find_files_by_ext, \
@@ -142,7 +142,7 @@ class MediaWebtorrentPlugin(MediaPlugin):
and state == TorrentState.IDLE:
# IDLE -> DOWNLOADING_METADATA
state = TorrentState.DOWNLOADING_METADATA
- bus.post(TorrentDownloadingMetadataEvent(resource=resource))
+ bus.post(TorrentDownloadedMetadataEvent(resource=resource))
elif 'downloading: ' in line.lower() \
and media_file is None:
# Find video files in torrent directory
diff --git a/platypush/plugins/torrent.py b/platypush/plugins/torrent.py
index 03f9c761..f6474d41 100644
--- a/platypush/plugins/torrent.py
+++ b/platypush/plugins/torrent.py
@@ -10,9 +10,9 @@ import urllib.parse
from platypush.context import get_bus
from platypush.plugins import Plugin, action
from platypush.message.event.torrent import \
- TorrentDownloadStartEvent, TorrentSeedingStartEvent, \
- TorrentStateChangeEvent, TorrentDownloadProgressEvent, \
- TorrentDownloadCompletedEvent, TorrentDownloadStopEvent
+ TorrentDownloadStartEvent, TorrentDownloadedMetadataEvent, TorrentStateChangeEvent, \
+ TorrentDownloadProgressEvent, TorrentDownloadCompletedEvent, TorrentDownloadStopEvent, \
+ TorrentPausedEvent, TorrentResumedEvent, TorrentQueuedEvent
class TorrentPlugin(Plugin):
@@ -25,6 +25,9 @@ class TorrentPlugin(Plugin):
* **requests** (``pip install requests``) [optional] for torrent info URL download
"""
+ # Wait time in seconds between two torrent transfer checks
+ _MONITOR_CHECK_INTERVAL = 3
+
default_torrent_ports = [6881, 6891]
supported_categories = {
'movies': 'https://movies-v2.api-fetch.website/movies/1',
@@ -48,6 +51,7 @@ class TorrentPlugin(Plugin):
self.torrent_ports = torrent_ports if torrent_ports else self.default_torrent_ports
self.download_dir = None
+ self._session = None
if download_dir:
self.download_dir = os.path.abspath(os.path.expanduser(download_dir))
@@ -101,7 +105,6 @@ class TorrentPlugin(Plugin):
format(category, self.supported_categories.keys()))
self.logger.info('Searching {} torrents for "{}"'.format(category, query))
- url = 'https://{category}-v2.api-fetch.website/{category}/1'.format(category=category)
request = urllib.request.urlopen(urllib.request.Request(
self.supported_categories[category] + '?' + urllib.parse.urlencode({
'sort': 'relevance',
@@ -139,41 +142,18 @@ class TorrentPlugin(Plugin):
for (quality, item) in items.items()
], key=lambda _: _.get('seeds'), reverse=True)
- @action
- def download(self, torrent, download_dir=None):
- """
- Download a torrent.
-
- :param torrent: Torrent to download. Supported formats:
-
- * Magnet URLs
- * Torrent URLs
- * Local torrent file
-
- :type resource: str
-
- :param download_dir: Directory to download, overrides the default download_dir attribute (default: None)
- :type download_dir: str
- """
-
+ def _get_torrent_info(self, torrent, download_dir):
import libtorrent as lt
- if not download_dir:
- if self.download_dir:
- download_dir = self.download_dir
- else:
- raise RuntimeError('download_dir not specified')
-
- download_dir = os.path.abspath(os.path.expanduser(download_dir))
- os.makedirs(download_dir, exist_ok=True)
- info = {}
torrent_file = None
magnet = None
- bus = get_bus()
+ info = {}
+ file_info = {}
if torrent.startswith('magnet:?'):
magnet = torrent
info = lt.parse_magnet_uri(magnet)
+ info['magnet'] = magnet
elif torrent.startswith('http://') or torrent.startswith('https://'):
import requests
response = requests.get(torrent, allow_redirects=True)
@@ -196,11 +176,139 @@ class TorrentPlugin(Plugin):
'save_path': download_dir,
}
- ses = lt.session()
- ses.listen_on(*self.torrent_ports)
+ return info, file_info, torrent_file, magnet
- self.logger.info('Downloading "{}" to "{}" from [{}]'
- .format(info['name'], self.download_dir, torrent))
+ def _fire_event(self, event, event_hndl):
+ bus = get_bus()
+ bus.post(event)
+
+ try:
+ if event_hndl:
+ event_hndl(event)
+ except Exception as e:
+ self.logger.warning('Exception in torrent event handler: {}'.format(str(e)))
+ self.logger.exception(e)
+
+ def _torrent_monitor(self, torrent, transfer, download_dir, event_hndl, is_media):
+ def thread():
+ files = []
+ last_status = None
+ download_started = False
+ metadata_downloaded = False
+
+ while not transfer.is_finished():
+ if torrent not in self.transfers:
+ self.logger.info('Torrent {} has been stopped and removed'.format(torrent))
+ self._fire_event(TorrentDownloadStopEvent(url=torrent), event_hndl)
+ break
+
+ status = transfer.status()
+ torrent_file = transfer.torrent_file()
+
+ if torrent_file:
+ self.torrent_state[torrent]['size'] = torrent_file.total_size()
+ files = [os.path.join(
+ download_dir,
+ torrent_file.files().file_path(i))
+ for i in range(0, torrent_file.files().num_files())
+ ]
+
+ if is_media:
+ from platypush.plugins.media import MediaPlugin
+ files = [f for f in files if MediaPlugin._is_video_file(f)]
+
+ self.torrent_state[torrent]['download_rate'] = status.download_rate
+ self.torrent_state[torrent]['name'] = status.name
+ self.torrent_state[torrent]['num_peers'] = status.num_peers
+ self.torrent_state[torrent]['paused'] = status.paused
+ self.torrent_state[torrent]['progress'] = round(100 * status.progress, 2)
+ self.torrent_state[torrent]['state'] = status.state.name
+ self.torrent_state[torrent]['title'] = status.name
+ self.torrent_state[torrent]['torrent'] = torrent
+ self.torrent_state[torrent]['upload_rate'] = status.upload_rate
+ self.torrent_state[torrent]['url'] = torrent
+ self.torrent_state[torrent]['files'] = files
+
+ if transfer.has_metadata() and not metadata_downloaded:
+ self._fire_event(TorrentDownloadedMetadataEvent(**self.torrent_state[torrent]), event_hndl)
+ metadata_downloaded = True
+
+ if status.state == status.downloading and not download_started:
+ self._fire_event(TorrentDownloadStartEvent(**self.torrent_state[torrent]), event_hndl)
+ download_started = True
+
+ if last_status and status.progress != last_status.progress:
+ self._fire_event(TorrentDownloadProgressEvent(**self.torrent_state[torrent]), event_hndl)
+
+ if not last_status or status.state != last_status.state:
+ self._fire_event(TorrentStateChangeEvent(**self.torrent_state[torrent]), event_hndl)
+
+ if last_status and status.paused != last_status.paused:
+ if status.paused:
+ self._fire_event(TorrentPausedEvent(**self.torrent_state[torrent]), event_hndl)
+ else:
+ self._fire_event(TorrentResumedEvent(**self.torrent_state[torrent]), event_hndl)
+
+ last_status = status
+ time.sleep(self._MONITOR_CHECK_INTERVAL)
+
+ if transfer and transfer.is_finished():
+ self._fire_event(TorrentDownloadCompletedEvent(**self.torrent_state[torrent]), event_hndl)
+
+ self.remove(torrent)
+ return files
+
+ return thread
+
+ @action
+ def download(self, torrent, download_dir=None, _async=False, event_hndl=None, is_media=False):
+ """
+ Download a torrent.
+
+ :param torrent: Torrent to download. Supported formats:
+
+ * Magnet URLs
+ * Torrent URLs
+ * Local torrent file
+
+ :type torrent: str
+
+ :param download_dir: Directory to download, overrides the default download_dir attribute (default: None)
+ :type download_dir: str
+
+ :param _async: If true then the method will add the torrent to the transfer and then return. Updates on
+ the download status should be retrieved either by listening to torrent events or registering the
+ event handler. If false (default) then the method will exit only when the torrent download is complete.
+ :type _async: bool
+
+ :param event_hndl: A function that takes an event object as argument and is invoked upon a new torrent event
+ (download started, progressing, completed etc.)
+ :type event_hndl: function
+
+ :param is_media: Set it to true if you're downloading a media file that you'd like to stream as soon as the
+ first chunks are available. If so, then the events and the status method will only include media files
+ :type is_media: bool
+ """
+
+ if torrent in self.torrent_state and torrent in self.transfers:
+ return self.torrent_state[torrent]
+
+ import libtorrent as lt
+
+ if not download_dir:
+ if self.download_dir:
+ download_dir = self.download_dir
+ else:
+ raise RuntimeError('download_dir not specified')
+
+ download_dir = os.path.abspath(os.path.expanduser(download_dir))
+ os.makedirs(download_dir, exist_ok=True)
+ info, file_info, torrent_file, magnet = self._get_torrent_info(torrent, download_dir)
+
+ if not self._session:
+ self._session = lt.session()
+
+ self._session.listen_on(*self.torrent_ports)
params = {
'save_path': download_dir,
@@ -208,98 +316,64 @@ class TorrentPlugin(Plugin):
}
if magnet:
- transfer = lt.add_magnet_uri(ses, magnet, params)
- elif torrent_file:
+ transfer = lt.add_magnet_uri(self._session, magnet, params)
+ else:
params['ti'] = file_info
- transfer = ses.add_torrent(params)
-
- status = transfer.status()
- files = []
+ transfer = self._session.add_torrent(params)
self.transfers[torrent] = transfer
self.torrent_state[torrent] = {
'url': torrent,
- 'title': info['name'],
+ 'title': transfer.status().name,
'trackers': info['trackers'],
'save_path': download_dir,
}
- bus.post(TorrentDownloadStartEvent(**self.torrent_state[torrent]))
- last_status = None
+ self._fire_event(TorrentQueuedEvent(url=torrent), event_hndl)
+ self.logger.info('Downloading "{}" to "{}" from [{}]'.format(info['name'], download_dir, torrent))
+ monitor_thread = self._torrent_monitor(torrent=torrent, transfer=transfer, download_dir=download_dir,
+ event_hndl=event_hndl, is_media=is_media)
- while not status.is_seeding:
- if not torrent in self.transfers:
- self.logger.info('Torrent {} has been stopped and removed')
- bus.post(TorrentDownloadStopEvent(url=torrent))
- return
-
- if not last_status:
- bus.post(TorrentSeedingStartEvent(**self.torrent_state[torrent]))
-
- status = transfer.status()
- torrent_file = transfer.torrent_file()
- if torrent_file:
- files = [os.path.join(
- self.download_dir,
- torrent_file.files().file_path(i))
- for i in range(0, torrent_file.files().num_files())
- ]
-
- self.torrent_state[torrent]['progress'] = round(100 * status.progress, 2)
- self.torrent_state[torrent]['download_rate'] = status.download_rate
- self.torrent_state[torrent]['upload_rate'] = status.upload_rate
- self.torrent_state[torrent]['num_peers'] = status.num_peers
- self.torrent_state[torrent]['state'] = status.state
-
- if last_status and status.progress != last_status.progress:
- bus.post(TorrentDownloadProgressEvent(**self.torrent_state[torrent]))
-
- if not last_status or status.state != last_status.state:
- bus.post(TorrentStateChangeEvent(**self.torrent_state[torrent]))
-
- self.logger.info(('Torrent download: {:.2f}% complete (down: {:.1f} kb/s ' +
- 'up: {:.1f} kB/s peers: {} state: {})')
- .format(status.progress * 100,
- status.download_rate / 1000,
- status.upload_rate / 1000,
- status.num_peers, status.state))
-
- last_status = status
- time.sleep(5)
-
- if torrent_file:
- try: os.unlink(torrent_file)
- except: pass
-
- bus.post(TorrentDownloadCompletedEvent(**self.torrent_state[torrent], files=files))
- del self.torrent_state[torrent]
- del self.transfers[torrent]
- return files
+ if not _async:
+ return monitor_thread()
+ threading.Thread(target=monitor_thread).start()
+ return self.torrent_state[torrent]
@action
- def get_status(self):
+ def status(self, torrent=None):
"""
Get the status of the current transfers.
+ :param torrent: Torrent path, URL or magnet URI whose status will be retrieve (default: None, retrieve all
+ current transfers)
+ :type torrent: str
+
:returns: A dictionary in the format torrent_url -> status
"""
+ if torrent:
+ return self.torrent_state.get(torrent)
return self.torrent_state
@action
def pause(self, torrent):
"""
- Pause a torrent transfer.
+ Pause/resume a torrent transfer.
:param torrent: Torrent URL as returned from `get_status()`
:type torrent: str
"""
if torrent not in self.transfers:
- return (None, "No transfer in progress for {}".format(torrent))
+ return None, "No transfer in progress for {}".format(torrent)
- self.transfers[torrent].pause()
+ if self.torrent_state[torrent].get('paused', False):
+ self.transfers[torrent].resume()
+ else:
+ self.transfers[torrent].pause()
+
+ return self.torrent_state[torrent]
@action
def resume(self, torrent):
@@ -311,7 +385,7 @@ class TorrentPlugin(Plugin):
"""
if torrent not in self.transfers:
- return (None, "No transfer in progress for {}".format(torrent))
+ return None, "No transfer in progress for {}".format(torrent)
self.transfers[torrent].resume()
@@ -325,17 +399,35 @@ class TorrentPlugin(Plugin):
"""
if torrent not in self.transfers:
- return (None, "No transfer in progress for {}".format(torrent))
+ return None, "No transfer in progress for {}".format(torrent)
self.transfers[torrent].pause()
del self.torrent_state[torrent]
del self.transfers[torrent]
- def _generate_rand_filename(self, length=16):
+ if not len(self.transfers):
+ self.logger.info('No remaining active torrent transfers found, exiting session')
+ self._session = None
+
+ @action
+ def quit(self):
+ """
+ Quits all the transfers and the active session
+ """
+ if not self._session:
+ self.logger.info('No active sessions found')
+ return
+
+ transfers = self.transfers.copy()
+ for torrent in transfers:
+ self.remove(torrent)
+
+ @staticmethod
+ def _generate_rand_filename(length=16):
name = ''
for i in range(0, length):
name += hex(random.randint(0, 15))[2:].upper()
return name + '.torrent'
-# vim:sw=4:ts=4:et:
+# vim:sw=4:ts=4:et: