diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index cc0eb4a8..a0ffe4ec 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -1,13 +1,14 @@ import asyncio import datetime import dateutil.parser +import hashlib import inspect import json import os import re +import threading import time -from threading import Thread, get_ident from multiprocessing import Process from flask import Flask, Response, abort, jsonify, request as http_request, \ render_template, send_from_directory @@ -20,9 +21,11 @@ from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.request import Request -from platypush.utils import get_ssl_server_context, set_thread_name +from platypush.utils import get_ssl_server_context, set_thread_name, \ + get_ip_or_hostname from .. import Backend +from .media.handlers import MediaHandler class HttpBackend(Backend): @@ -55,15 +58,25 @@ class HttpBackend(Backend): * **redis** (``pip install redis``) * **websockets** (``pip install websockets``) * **python-dateutil** (``pip install python-dateutil``) + * **magic** (``pip install python-magic``), optional, for MIME type + support if you want to enable media streaming """ hidden_plugins = { 'assistant.google' } + # Default size for the bytes chunk sent over the media streaming infra + _DEFAULT_STREAMING_CHUNK_SIZE = 4096 + + # Maximum range size to be sent through the media streamer if Range header + # is not set + _DEFAULT_STREAMING_BLOCK_SIZE = 3145728 + def __init__(self, port=8008, websocket_port=8009, disable_websocket=False, redis_queue='platypush/http', dashboard={}, resource_dirs={}, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, + streaming_chunk_size=_DEFAULT_STREAMING_CHUNK_SIZE, maps={}, **kwargs): """ :param port: Listen port for the web server (default: 8008) @@ -125,12 +138,17 @@ class HttpBackend(Backend): db: "sqlite:////home/blacklight/.local/share/platypush/feeds/rss.db" :type dashboard: dict + + :param streaming_chunk_size: Size for the chunks of bytes sent over the + media streaming infrastructure (default: 4096 bytes) + :type streaming_chunk_size: int """ super().__init__(**kwargs) self.port = port self.websocket_port = websocket_port + self.app = None self.redis_queue = redis_queue self.dashboard = dashboard self.maps = maps @@ -148,6 +166,15 @@ class HttpBackend(Backend): ssl_capath=ssl_capath) \ if ssl_cert else None + self.remote_base_url = '{proto}://{host}:{port}'.format( + proto=('https' if self.ssl_context else 'http'), + host=get_ip_or_hostname(), port=self.port) + + self.local_base_url = '{proto}://localhost:{port}'.format( + proto=('https' if self.ssl_context else 'http'), port=self.port) + + self._media_map_lock = threading.RLock() + def send_message(self, msg): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') @@ -241,8 +268,10 @@ class HttpBackend(Backend): basedir = os.path.dirname(inspect.getfile(self.__class__)) template_dir = os.path.join(basedir, 'templates') + media_map = {} app = Flask(__name__, template_folder=template_dir) - self.redis_thread = Thread(target=self.redis_poll) + + self.redis_thread = threading.Thread(target=self.redis_poll) self.redis_thread.start() @app.route('/execute', methods=['POST']) @@ -335,6 +364,159 @@ class HttpBackend(Backend): return send_from_directory(real_path, file_path) + def get_media_url(media_id): + return '{url}/media/{media_id}'.format( + url=self.remote_base_url, media_id=media_id) + + def get_media_id(source): + return hashlib.sha1(source.encode()).hexdigest() + + def register_media(source): + media_id = get_media_id(source) + media_url = get_media_url(media_id) + + with self._media_map_lock: + if media_id in media_map: + raise FileExistsError('"{}" is already registered on {}'. + format(source, media_map[media_id].url)) + + media_hndl = MediaHandler.build(source, url=media_url) + media_map[media_id] = media_hndl + + self.logger.info('Streaming "{}" on {}'.format(source, media_url)) + return media_hndl + + + def unregister_media(source): + if source is None: + raise KeyError('No media_id specified') + + media_id = get_media_id(source) + media_info = {} + + with self._media_map_lock: + if media_id not in media_map: + raise FileNotFoundError('{} is not a registered media_id'. + format(source)) + media_info = media_map.pop(media_id) + + self.logger.info('Unregistered {} from {}'.format( + source, media_info.get('url'))) + + return media_info + + + def stream_media(media_id, request): + media_hndl = media_map.get(media_id) + if not media_hndl: + raise FileNotFoundError('{} is not a registered media_id'. + format(media_id)) + + from_bytes = None + to_bytes = None + range_hdr = request.headers.get('range') + content_length = media_hndl.content_length + status_code = 200 + + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Type': media_hndl.mime_type, + } + + if 'download' in request.args: + headers['Content-Disposition'] = 'attachment' + \ + ('; filename="{}"'.format(media_hndl.filename) if + media_hndl.filename else '') + + if range_hdr: + headers['Accept-Ranges'] = 'bytes' + from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-') + from_bytes = int(from_bytes) + + if not to_bytes: + to_bytes = content_length-1 + # to_bytes = from_bytes + self._DEFAULT_STREAMING_BLOCK_SIZE + content_length -= from_bytes + else: + to_bytes = int(to_bytes) + content_length = to_bytes - from_bytes + + status_code = 206 + headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format( + start=from_bytes, end=to_bytes, + size=media_hndl.content_length) + else: + from_bytes = 0 + to_bytes = self._DEFAULT_STREAMING_BLOCK_SIZE + + headers['Content-Length'] = content_length + + return Response(media_hndl.get_data( + from_bytes=from_bytes, to_bytes=to_bytes, + chunk_size=self._DEFAULT_STREAMING_CHUNK_SIZE), + status_code, headers=headers, mimetype=headers['Content-Type'], + direct_passthrough=True) + + @app.route('/media', methods=['GET', 'PUT']) + def add_or_get_media(): + """ + This route can be used by the `media` plugin to add streaming + content over HTTP or to get the list of registered streams + """ + + if http_request.method == 'GET': + return jsonify([dict(media) for media in media_map.values()]) + + args = {} + try: + args = json.loads(http_request.data.decode('utf-8')) + except: + abort(400, 'Invalid JSON request') + + source = args.get('source') + if not source: + abort(400, 'The request does not contain any source') + + try: + media_hndl = register_media(source) + return jsonify(dict(media_hndl)) + except FileExistsError as e: + abort(409, str(e)) + except FileNotFoundError as e: + abort(404, str(e)) + except AttributeError as e: + abort(400, str(e)) + except Exception as e: + self.logger.exception(e) + abort(500, str(e)) + + @app.route('/media/', methods=['GET', 'DELETE']) + def stream_or_delete_media(media_id): + """ + This route can be used to stream active media points or unregister + a mounted media stream + """ + + # Remove the extension + media_id = '.'.join(media_id.split('.')[:-1]) + + try: + if http_request.method == 'GET': + if media_id is None: + return jsonify(media_map) + else: + return stream_media(media_id, http_request) + else: + media_info = unregister_media(media_id) + return jsonify(media_info) + except (AttributeError, FileNotFoundError) as e: + abort(404, str(e)) + except KeyError as e: + abort(400, str(e)) + except Exception as e: + self.logger.exception(e) + abort(500, str(e)) + @app.route('/dashboard', methods=['GET']) def dashboard(): """ Route for the fullscreen dashboard """ @@ -448,7 +630,6 @@ class HttpBackend(Backend): **websocket_args)) loop.run_forever() - def run(self): super().run() os.putenv('FLASK_APP', 'platypush') @@ -462,14 +643,14 @@ class HttpBackend(Backend): self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) - webserver = self.webserver() - self.server_proc = Process(target=webserver.run, + self.app = self.webserver() + self.server_proc = Process(target=self.app.run, name='WebServer', kwargs=kwargs) self.server_proc.start() if not self.disable_websocket: - self.websocket_thread = Thread(target=self.websocket) + self.websocket_thread = threading.Thread(target=self.websocket) self.websocket_thread.start() self.server_proc.join() diff --git a/platypush/backend/http/media/__init__.py b/platypush/backend/http/media/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/platypush/backend/http/media/handlers/__init__.py b/platypush/backend/http/media/handlers/__init__.py new file mode 100644 index 00000000..ae1d04a5 --- /dev/null +++ b/platypush/backend/http/media/handlers/__init__.py @@ -0,0 +1,61 @@ +class MediaHandler: + """ + Abstract class to manage media handlers that can be streamed over the HTTP + server through the `/media` endpoint. + """ + + prefix_handlers = [] + + def __init__(self, source, filename=None, + mime_type='application/octet-stream', name=None, url=None): + matched_handlers = [hndl for hndl in self.prefix_handlers + if source.startswith(hndl)] + + if not matched_handlers: + raise AttributeError(('No matched handlers found for source "{}" ' + + 'through {}. Supported handlers: {}').format( + source, self.__class__.__name__, + self.prefix_handlers)) + + self.name = name + self.filename = name + self.source = source + self.url = url + self.mime_type = mime_type + self.content_length = 0 + self._matched_handler = matched_handlers[0] + + + @classmethod + def build(cls, source, *args, **kwargs): + errors = {} + + for hndl_class in supported_handlers: + try: + return hndl_class(source, *args, **kwargs) + except Exception as e: + errors[hndl_class.__name__] = str(e) + + raise AttributeError(('The source {} has no handlers associated. ' + + 'Errors: {}').format(source, errors)) + + def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None): + raise NotImplementedError() + + def __iter__(self): + for attr in ['name', 'source', 'mime_type', 'url', 'prefix_handlers']: + yield (attr, getattr(self, attr)) + + + +from .file import FileHandler + + +__all__ = ['MediaHandler', 'FileHandler'] + + +supported_handlers = [eval(hndl) for hndl in __all__ + if hndl != MediaHandler.__name__] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/media/handlers/file.py b/platypush/backend/http/media/handlers/file.py new file mode 100644 index 00000000..159066a0 --- /dev/null +++ b/platypush/backend/http/media/handlers/file.py @@ -0,0 +1,49 @@ +import functools +import mimetypes +import os + +from platypush.utils import get_mime_type + +from . import MediaHandler + + +class FileHandler(MediaHandler): + prefix_handlers = ['file://'] + + def __init__(self, source, *args, **kwargs): + super().__init__(source, *args, **kwargs) + + self.path = os.path.abspath(os.path.expanduser( + self.source[len(self._matched_handler):])) + self.filename = self.path.split('/')[-1] + + if not os.path.isfile(self.path): + raise FileNotFoundError('{} is not a valid file'. + format(self.path)) + + self.mime_type = get_mime_type(source) + if self.mime_type[:5] not in ['video', 'audio', 'image']: + raise AttributeError('{} is not a valid media file'.format(source)) + + self.extension = mimetypes.guess_extension(self.mime_type) + if self.url: + self.url += self.extension + self.content_length = os.path.getsize(self.path) + + + def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None): + if from_bytes is None: + from_bytes = 0 + if to_bytes is None: + to_bytes = os.path.getsize(self.path) + if chunk_size is None: + chunk_size = os.path.getsize(self.path) - from_bytes + + with open(self.path, 'rb') as f: + f.seek(from_bytes) + for chunk in iter(functools.partial( + f.read, min(to_bytes-from_bytes, chunk_size)), b''): + yield chunk + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index b6a56817..815d1151 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -8,9 +8,9 @@ import urllib.request import urllib.parse from platypush.config import Config -from platypush.context import get_plugin +from platypush.context import get_plugin, get_backend from platypush.plugins import Plugin, action -from platypush.utils import get_ip_or_hostname, is_process_alive + class PlayerState(enum.Enum): STOP = 'stop' @@ -25,26 +25,19 @@ class MediaPlugin(Plugin): Requires: * A media player installed (supported so far: mplayer, omxplayer, chromecast) - * **python-libtorrent** (``pip install python-libtorrent``), optional for Torrent support + * The :class:`platypush.plugins.media.webtorrent` plugin for optional torrent support through webtorrent (recommented) + * **python-libtorrent** (``pip install python-libtorrent``), optional, for torrent support through the native Python plugin * **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support + * **requests** (``pip install requests``), optional, for local files over HTTP streaming supporting - To start the local media stream service over HTTP: - - * **nodejs** installed on your system - * **express** module (``npm install express``) - * **mime-types** module (``npm install mime-types``) + To start the local media stream service over HTTP you will also need the + :class:`platypush.backend.http.HttpBackend` backend enabled. """ # A media plugin can either be local or remote (e.g. control media on # another device) _is_local = True - # Default port for the local resources HTTP streaming service - _default_streaming_port = 8989 - - # setup.py install will place localstream in PATH - _local_stream_bin = 'localstream' - _NOT_IMPLEMENTED_ERR = NotImplementedError( 'This method must be implemented in a derived class') @@ -71,7 +64,7 @@ class MediaPlugin(Plugin): 'media.chromecast'} def __init__(self, media_dirs=[], download_dir=None, env=None, - streaming_port=_default_streaming_port, *args, **kwargs): + *args, **kwargs): """ :param media_dirs: Directories that will be scanned for media files when a search is performed (default: none) @@ -84,10 +77,6 @@ class MediaPlugin(Plugin): :param env: Environment variables key-values to pass to the player executable (e.g. DISPLAY, XDG_VTNR, PULSE_SINK etc.) :type env: dict - - :param streaming_port: Port to be used for streaming local resources - over HTTP (default: 8989) - :type streaming_port: int """ super().__init__(*args, **kwargs) @@ -139,10 +128,6 @@ class MediaPlugin(Plugin): self.media_dirs.add(self.download_dir) self._videos_queue = [] - self._streaming_port = streaming_port - self._streaming_proc = None - self._streaming_started = threading.Event() - self._streaming_ended = threading.Event() def _get_resource(self, resource): """ @@ -354,78 +339,62 @@ class MediaPlugin(Plugin): @action - def start_streaming(self, media, port=None): + def start_streaming(self, media): """ Starts streaming local media over the specified HTTP port. The stream will be available to HTTP clients on - `http://{this-ip}:{port}/media + `http://{this-ip}:{http_backend_port}/media/` :param media: Media to stream + :type media: str + + :returns: dict containing the streaming URL.Example:: + + { + "id": "0123456abcdef.mp4", + "source": "file:///mnt/media/movies/movie.mp4", + "mime_type": "video/mp4", + "url": "http://192.168.1.2:8008/media/0123456abcdef.mp4" + } """ - if self._streaming_proc: - self.logger.info('A streaming process is already running, ' + - 'terminating it first') - self.stop_streaming() + import requests - if port is None: - port = self._streaming_port - - self._streaming_started.clear() - self._streaming_ended.clear() - self._streaming_proc = subprocess.Popen( - [self._local_stream_bin, media, str(port)], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT - ) - - threading.Thread(target=self._streaming_process_monitor(media)).start() - url = 'http://{}:{}/media'.format(get_ip_or_hostname(), - self._streaming_port) - - self.logger.info('Starting streaming {} on {}'.format(media, url)) - self._streaming_started.wait() - self.logger.info('Started streaming {} on {}'.format(media, url)) - return { 'url': url } - - @action - def stop_streaming(self): - if not self._streaming_proc: - self.logger.info('No streaming process found') + http = get_backend('http') + if not http: + self.logger.warning('Unable to stream {}: HTTP backend unavailable'. + format(media)) return - self._streaming_proc.terminate() - self._streaming_proc.wait() - try: self._streaming_proc.kill() - except: pass - self._streaming_proc = None + self.logger.info('Starting streaming {}'.format(media)) + response = requests.put('{url}/media'.format(url=http.local_base_url), + json = { 'source': media }) + if not response.ok: + self.logger.warning('Unable to start streaming: {}'. + format(response.text or response.reason)) + return - def _streaming_process_monitor(self, media): - def _thread(): - if not self._streaming_proc: - return + return response.json() - while True: - if not self._streaming_proc or not \ - is_process_alive(self._streaming_proc.pid): - break + @action + def stop_streaming(self, media_id): + import requests - line = self._streaming_proc.stdout.readline().decode().strip() - if not line: - continue + http = get_backend('http') + if not http: + self.logger.warning('Cannot unregister {}: HTTP backend unavailable'. + format(media_id)) + return - if line.startswith('Listening on'): - self._streaming_started.set() - break + response = requests.delete('{url}/media/{id}'. + format(url=http.local_base_url, id=media_id)) - self.logger.info('Message from streaming service: {}'.format(line)) + if not response.ok: + self.logger.warning('Unable to unregister media_id {}: {}'.format( + media_id, response.reason)) + return - self._streaming_proc.wait() - try: self.stop_streaming() - except: pass - self._streaming_ended.set() - self.logger.info('Streaming service terminated') - - return _thread + return response.json() def _youtube_search_api(self, query): diff --git a/platypush/plugins/media/bin/localstream b/platypush/plugins/media/bin/localstream deleted file mode 100755 index 8703bb1d..00000000 --- a/platypush/plugins/media/bin/localstream +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - -if [ -d "$HOME/node_modules" ]; then - export NODE_PATH=$HOME/node_modules:$NODE_PATH -fi - -file=$1 -port= -[ ! -z "$2" ] && port=$2 - -node $DIR/localstream.js "$file" $port - diff --git a/platypush/plugins/media/bin/localstream.js b/platypush/plugins/media/bin/localstream.js deleted file mode 100755 index 595c6860..00000000 --- a/platypush/plugins/media/bin/localstream.js +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env node - -// Requires: -// - express (`npm install express`) -// - mime-types (`npm install mime-types`) - -const express = require('express') -const fs = require('fs') -const path = require('path') -const process = require('process') -const mime = require('mime-types') -const app = express() - -function parseArgv() { - let file = undefined - let port = 8989 - - if (process.argv.length < 3) { - throw Error(`Usage: ${process.argv[0]} ${process.argv[1]} [port=${port}]`) - } - - file = process.argv[2] - - if (process.argv.length > 3) { - port = parseInt(process.argv[3]) - } - - return { file: file, port: port } -} - -let args = parseArgv() - -app.get('/media', function(req, res) { - const path = args.file - const ext = args.file.split('.').pop() - const stat = fs.statSync(path) - const fileSize = stat.size - const range = req.headers.range - const mimeType = mime.lookup(ext) - - if (range) { - const parts = range.replace(/bytes=/, "").split("-") - const start = parseInt(parts[0], 10) - const end = parts[1] - ? parseInt(parts[1], 10) - : fileSize-1 - - const chunksize = (end-start)+1 - const file = fs.createReadStream(path, {start, end}) - const head = { - 'Content-Range': `bytes ${start}-${end}/${fileSize}`, - 'Accept-Ranges': 'bytes', - 'Content-Length': chunksize, - 'Content-Type': mimeType, - } - - res.writeHead(206, head) - file.pipe(res) - } else { - const head = { - 'Content-Length': fileSize, - 'Content-Type': mimeType, - } - res.writeHead(200, head) - fs.createReadStream(path).pipe(res) - } -}) - -app.listen(args.port, function () { - console.log(`Listening on port ${args.port}`) -}) diff --git a/platypush/plugins/media/chromecast.py b/platypush/plugins/media/chromecast.py index 9db1b16f..f5c22f06 100644 --- a/platypush/plugins/media/chromecast.py +++ b/platypush/plugins/media/chromecast.py @@ -1,5 +1,4 @@ import datetime -import os import re import pychromecast @@ -181,9 +180,6 @@ class MediaChromecastPlugin(MediaPlugin): player='chromecast', **player_args) - if resource.startswith('file://'): - resource = resource[len('file://'):] - if not content_type: content_type = get_mime_type(resource) @@ -191,8 +187,10 @@ class MediaChromecastPlugin(MediaPlugin): raise RuntimeError('content_type required to process media {}'. format(resource)) - if os.path.isfile(resource): + if not resource.startswith('http://') and \ + not resource.startswith('https://'): resource = self.start_streaming(resource).output['url'] + self.logger.info('HTTP media stream started on {}'.format(resource)) self.logger.info('Playing {} on {}'.format(resource, chromecast)) diff --git a/setup.py b/setup.py index f1188716..b4e2e2ad 100755 --- a/setup.py +++ b/setup.py @@ -49,8 +49,7 @@ setup( 'platydock=platypush.platydock:main', ], }, - scripts = ['bin/platyvenv', 'platypush/plugins/media/bin/localstream.js', - 'platypush/plugins/media/bin/localstream'], + scripts = ['bin/platyvenv'], # data_files = [ # ('/etc/platypush', ['platypush/config.example.yaml']) # ],