Refactored webtorrent plugin and synchronization with the player

The WebTorrent plugin now uses the `download -o <download_dir> -p <port>` options.
Improved interaction both with the webtorrent executable and the player
executable, and triggering the right events upon state changes, as well
as free support for torrent streaming through the webtorrent executable.
This commit is contained in:
Fabio Manganiello 2019-02-05 00:15:36 +01:00
parent 9506813dc2
commit 52d0ba442b
6 changed files with 241 additions and 54 deletions

View File

@ -10,6 +10,15 @@ class TorrentEvent(Event):
super().__init__(*args, **kwargs)
class TorrentDownloadingMetadataEvent(TorrentEvent):
"""
Event triggered upon torrent metadata download start
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class TorrentDownloadStartEvent(TorrentEvent):
"""
Event triggered upon torrent download start

View File

@ -27,6 +27,10 @@ class MediaPlugin(Plugin):
* **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support
"""
# A media plugin can either be local or remote (e.g. control media on
# another device)
_is_local = True
_NOT_IMPLEMENTED_ERR = NotImplementedError(
'This method must be implemented in a derived class')
@ -360,5 +364,8 @@ class MediaPlugin(Plugin):
return proc.stdout.read().decode("utf-8", "strict")[:-1]
def is_local(self):
return self._is_local
# vim:sw=4:ts=4:et:

View File

@ -72,8 +72,9 @@ class MediaMplayerPlugin(MediaPlugin):
self.args = args or []
self._init_mplayer_bin()
self._build_actions()
self._mplayer = None
self._player = None
self._mplayer_timeout = mplayer_timeout
self._mplayer_stopped_event = threading.Event()
self._is_playing_torrent = False
@ -98,9 +99,9 @@ class MediaMplayerPlugin(MediaPlugin):
self.mplayer_bin = mplayer_bin
def _init_mplayer(self, mplayer_args=None):
if self._mplayer:
if self._player:
try:
self._mplayer.quit()
self._player.quit()
except:
self.logger.debug('Failed to quit mplayer before _exec: {}'.
format(str))
@ -119,7 +120,7 @@ class MediaMplayerPlugin(MediaPlugin):
if self._env:
popen_args['env'] = self._env
self._mplayer = subprocess.Popen(args, **popen_args)
self._player = subprocess.Popen(args, **popen_args)
threading.Thread(target=self._process_monitor()).start()
def _build_actions(self):
@ -155,7 +156,7 @@ class MediaMplayerPlugin(MediaPlugin):
if cmd_name == 'loadfile' or cmd_name == 'loadlist':
self._init_mplayer(mplayer_args)
else:
if not self._mplayer:
if not self._player:
self.logger.warning('MPlayer is not running')
cmd = '{}{}{}{}\n'.format(
@ -163,8 +164,8 @@ class MediaMplayerPlugin(MediaPlugin):
cmd_name, ' ' if args else '',
' '.join(repr(a) for a in args)).encode()
self._mplayer.stdin.write(cmd)
self._mplayer.stdin.flush()
self._player.stdin.write(cmd)
self._player.stdin.flush()
bus = get_bus()
if cmd_name == 'loadfile' or cmd_name == 'loadlist':
@ -173,23 +174,23 @@ class MediaMplayerPlugin(MediaPlugin):
bus.post(MediaPauseEvent())
elif cmd_name == 'quit' or cmd_name == 'stop':
if cmd_name == 'quit':
self._mplayer.terminate()
self._mplayer.wait()
try: self._mplayer.kill()
self._player.terminate()
self._player.wait()
try: self._player.kill()
except: pass
self._mplayer = None
self._player = None
if not wait_for_response:
return
poll = select.poll()
poll.register(self._mplayer.stdout, select.POLLIN)
poll.register(self._player.stdout, select.POLLIN)
last_read_time = time.time()
while time.time() - last_read_time < self._mplayer_timeout:
result = poll.poll(0)
if result:
line = self._mplayer.stdout.readline().decode()
line = self._player.stdout.readline().decode()
last_read_time = time.time()
if line.startswith('ANS_'):
@ -222,15 +223,17 @@ class MediaMplayerPlugin(MediaPlugin):
def _process_monitor(self):
def _thread():
if not self._mplayer:
if not self._player:
return
self._mplayer.wait()
self._mplayer_stopped_event.clear()
self._player.wait()
try: self.quit()
except: pass
get_bus().post(MediaStopEvent())
self._mplayer = None
self._mplayer_stopped_event.set()
self._player = None
return _thread

View File

@ -1,3 +1,4 @@
import enum
import os
import re
import subprocess
@ -13,6 +14,12 @@ from platypush.message.event.video import VideoPlayEvent, VideoPauseEvent, \
from platypush.plugins import action
class PlayerEvent(enum.Enum):
STOP = 'stop'
PLAY = 'play'
PAUSE = 'pause'
class MediaOmxplayerPlugin(MediaPlugin):
"""
Plugin to control video and media playback using OMXPlayer.
@ -36,6 +43,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
self.args = args
self._player = None
self._handlers = { e.value: [] for e in PlayerEvent }
@action
def play(self, resource):
@ -278,19 +286,36 @@ class MediaOmxplayerPlugin(MediaPlugin):
redis.send_message(msg)
def add_handler(self, event_type, callback):
if event_type not in self._handlers.keys():
raise AttributeError('{} is not a valid PlayerEvent type'.
format(event_type))
self._handlers[event_type].append(callback)
def on_play(self):
def _f(player):
self.send_message(VideoPlayEvent(video=self._player.get_source()))
video = self._player.get_source()
self.send_message(VideoPlayEvent(video=video))
for callback in self._handlers[PlayerEvent.PLAY.value]:
callback(video)
return _f
def on_pause(self):
def _f(player):
self.send_message(VideoPauseEvent(video=self._player.get_source()))
video = self._player.get_source()
self.send_message(VideoPauseEvent(video=video))
for callback in self._handlers[PlayerEvent.PAUSE.value]:
callback(video)
return _f
def on_stop(self):
def _f(player):
self.send_message(VideoStopEvent())
for callback in self._handlers[PlayerEvent.STOP.value]:
callback()
return _f

View File

@ -1,8 +1,9 @@
import datetime
import enum
import os
import re
import select
import subprocess
import tempfile
import threading
import time
@ -11,12 +12,20 @@ from platypush.context import get_bus, get_plugin
from platypush.message.response import Response
from platypush.plugins.media import PlayerState, MediaPlugin
from platypush.message.event.torrent import TorrentDownloadStartEvent, \
TorrentDownloadCompletedEvent
TorrentDownloadCompletedEvent, TorrentDownloadProgressEvent, \
TorrentDownloadingMetadataEvent
from platypush.plugins import action
from platypush.utils import find_bins_in_path
from platypush.utils import find_bins_in_path, find_files_by_ext, \
is_process_alive
class TorrentState(enum.Enum):
IDLE = 1
DOWNLOADING_METADATA = 2
DOWNLOADING = 3
DOWNLOADED = 4
class MediaWebtorrentPlugin(MediaPlugin):
"""
Plugin to download and stream videos using webtorrent
@ -38,7 +47,8 @@ class MediaWebtorrentPlugin(MediaPlugin):
# Download at least 10 MBs before starting streaming
_download_size_before_streaming = 10 * 2**20
def __init__(self, webtorrent_bin=None, *args, **kwargs):
def __init__(self, webtorrent_bin=None, webtorrent_port=None, *args,
**kwargs):
"""
media.webtorrent will use the default media player plugin you have
configured (e.g. mplayer, omxplayer) to stream the torrent.
@ -46,10 +56,15 @@ class MediaWebtorrentPlugin(MediaPlugin):
:param webtorrent_bin: Path to your webtorrent executable. If not set,
then Platypush will search for the right executable in your PATH
:type webtorrent_bin: str
:param webtorrent_port: Port where the webtorrent will be running
streaming server will be running (default: 8000)
:type webtorrent_port: int
"""
super().__init__(*args, **kwargs)
self.webtorrent_port = webtorrent_port
self._init_webtorrent_bin(webtorrent_bin=webtorrent_bin)
self._init_media_player()
@ -97,38 +112,142 @@ class MediaWebtorrentPlugin(MediaPlugin):
self._supported_media_plugins))
def _process_monitor(self, resource, output_file):
def _read_process_line(self):
line = self._webtorrent_process.stdout.readline().decode().strip()
# Strip output of the colors
return re.sub('\x1b\[((\d+m)|(.{1,2}))', '', line).strip()
def _process_monitor(self, resource, download_dir):
def _thread():
if not self._webtorrent_process:
return
######
state = TorrentState.IDLE
bus = get_bus()
bus.post(TorrentDownloadStartEvent(resource=resource))
webtorrent_url = None
media_file = None
poll = select.poll()
poll.register(self._webtorrent_process.stdout, select.POLLIN)
# First wait for the metadata to be ready and the streaming started
while True:
result = poll.poll(0)
if not result:
continue
if not self._is_process_alive():
break
line = self._read_process_line()
if 'fetching torrent metadata from' in line.lower() \
and state == TorrentState.IDLE:
# IDLE -> DOWNLOADING_METADATA
state = TorrentState.DOWNLOADING_METADATA
bus.post(TorrentDownloadingMetadataEvent(resource=resource))
elif 'downloading: ' in line.lower() \
and media_file is None:
# Find video files in torrent directory
output_dir = os.path.join(
download_dir, re.search(
'downloading: (.+?)$', line, flags=re.IGNORECASE
).group(1))
media_files = sorted(find_files_by_ext(
output_dir, *self._media_plugin.video_extensions))
if not media_files:
raise RuntimeError('No video files found in {}'.
format(output_dir))
media_file = os.path.join(output_dir, media_files[0])
elif 'server running at: ' in line.lower() \
and webtorrent_url is None:
# Streaming started
webtorrent_url = re.search('server running at: (.+?)$',
line, flags=re.IGNORECASE).group(1)
self.logger.info('Torrent stream started on {}'.format(
webtorrent_url))
if state.value <= TorrentState.DOWNLOADING_METADATA.value \
and media_file and webtorrent_url:
# DOWNLOADING_METADATA -> DOWNLOADING
state = TorrentState.DOWNLOADING
bus.post(TorrentDownloadStartEvent(
resource=resource, media_file=media_file,
stream_url=webtorrent_url))
break
if not media_file or not webtorrent_url:
raise RuntimeError('The webtorrent process did not ' +
'provide the required data')
# Then wait until we have enough chunks to start the player
while True:
result = poll.poll(0)
if not result:
continue
if not self._is_process_alive():
break
try:
if os.path.getsize(output_file) > \
if os.path.getsize(media_file) > \
self._download_size_before_streaming:
break
except FileNotFoundError:
pass
continue
self._media_plugin.play(output_file)
self._webtorrent_process.wait()
self.logger.info(
'Starting playback of {} to {} through {}'.format(
media_file, self._media_plugin.__class__.__name__,
webtorrent_url))
media = media_file if self._media_plugin.is_local() \
else webtorrent_url
self._media_plugin.play(media)
self.logger.info('Waiting for player to terminate')
self._wait_for_player()
self.logger.info('Torrent player terminated')
bus.post(TorrentDownloadCompletedEvent(resource=resource))
self._webtorrent_process = None
try: self.quit()
except: pass
self.logger.info('WebTorrent process terminated')
return _thread
def _wait_for_player(self):
media_cls = self._media_plugin.__class__.__name__
stop_evt = None
def _get_torrent_download_path(self):
if self._media_plugin.download_dir:
# TODO set proper file name based on the torrent metadata
return os.path.join(self._media_plugin.download_dir,
'torrent_media_' + datetime.datetime.
today().strftime('%Y-%m-%d_%H-%M-%S-%f'))
if media_cls == 'MediaMplayerPlugin':
stop_evt = self._media_plugin._mplayer_stopped_event
elif media_cls == 'MediaOmxplayerPlugin':
stop_evt = threading.Event()
def stop_callback():
stop_evt.set()
self._media_plugin.add_handler('stop', stop_callback)
if stop_evt:
stop_evt.wait()
else:
return tempfile.NamedTemporaryFile(delete=False).name
# Fallback: wait for the webtorrent process to terminate
self._webtorrent_process.wait()
def _get_torrent_download_dir(self):
if self._media_plugin.download_dir:
return self._media_plugin.download_dir
else:
d = os.path.join(os.environ['HOME'], 'Downloads')
os.makedirs(d, exist_ok=True)
return d
@action
@ -148,16 +267,18 @@ class MediaWebtorrentPlugin(MediaPlugin):
self.logger.debug('Failed to quit the previous instance: {}'.
format(str))
output_file = self._get_torrent_download_path()
webtorrent_args = [self.webtorrent_bin, '--stdout', resource]
download_dir = self._get_torrent_download_dir()
webtorrent_args = [self.webtorrent_bin, 'download', '-o', download_dir]
with open(output_file, 'w') as f:
self._webtorrent_process = subprocess.Popen(webtorrent_args,
stdout=f)
if self.webtorrent_port:
webtorrent_args += ['-p', self.webtorrent_port]
threading.Thread(target=self._process_monitor(
resource=resource, output_file=output_file)).start()
webtorrent_args += [resource]
self._webtorrent_process = subprocess.Popen(webtorrent_args,
stdout=subprocess.PIPE)
threading.Thread(target=self._process_monitor(
resource=resource, download_dir=download_dir)).start()
return { 'resource': resource }
@ -169,8 +290,7 @@ class MediaWebtorrentPlugin(MediaPlugin):
@action
def quit(self):
""" Quit the player """
if self._webtorrent_process and self._is_process_alive(
self._webtorrent_process.pid):
if self._is_process_alive():
self._webtorrent_process.terminate()
self._webtorrent_process.wait()
try: self._webtorrent_process.kill()
@ -186,15 +306,8 @@ class MediaWebtorrentPlugin(MediaPlugin):
return self.play(resource)
def _is_process_alive(self):
if not self._webtorrent_process:
return False
try:
os.kill(self._webtorrent_process.pid, 0)
return True
except OSError:
self._webtorrent_process = None
return False
return is_process_alive(self._webtorrent_process.pid) \
if self._webtorrent_process else False
@action
def status(self):

View File

@ -180,4 +180,34 @@ def find_bins_in_path(bin_name):
os.access(os.path.join(p, bin_name), os.X_OK))]
def find_files_by_ext(directory, *exts):
"""
Finds all the files in the given directory with the provided extensions
"""
if not exts:
raise AttributeError('No extensions provided')
if not os.path.isdir(directory):
raise AttributeError('{} is not a valid directory'.format(directory))
min_len = len(min(exts, key=len))
max_len = len(max(exts, key=len))
result = []
for root, dirs, files in os.walk(directory):
for i in range(min_len, max_len+1):
result += [f for f in files if f[-i:] in exts]
return result
def is_process_alive(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False
# vim:sw=4:ts=4:et: