Support for media streaming over internal HTTP server

Dropped the additional node.js dependency of localstream and relied
instead on a built-in solution to stream files
This commit is contained in:
Fabio Manganiello 2019-02-07 14:26:10 +01:00
parent 077bb0686b
commit 9ec3365413
9 changed files with 350 additions and 178 deletions

View File

@ -1,13 +1,14 @@
import asyncio
import datetime
import dateutil.parser
import hashlib
import inspect
import json
import os
import re
import threading
import time
from threading import Thread, get_ident
from multiprocessing import Process
from flask import Flask, Response, abort, jsonify, request as http_request, \
render_template, send_from_directory
@ -20,9 +21,11 @@ from platypush.message import Message
from platypush.message.event import Event, StopEvent
from platypush.message.event.web.widget import WidgetUpdateEvent
from platypush.message.request import Request
from platypush.utils import get_ssl_server_context, set_thread_name
from platypush.utils import get_ssl_server_context, set_thread_name, \
get_ip_or_hostname
from .. import Backend
from .media.handlers import MediaHandler
class HttpBackend(Backend):
@ -55,15 +58,25 @@ class HttpBackend(Backend):
* **redis** (``pip install redis``)
* **websockets** (``pip install websockets``)
* **python-dateutil** (``pip install python-dateutil``)
* **magic** (``pip install python-magic``), optional, for MIME type
support if you want to enable media streaming
"""
hidden_plugins = {
'assistant.google'
}
# Default size for the bytes chunk sent over the media streaming infra
_DEFAULT_STREAMING_CHUNK_SIZE = 4096
# Maximum range size to be sent through the media streamer if Range header
# is not set
_DEFAULT_STREAMING_BLOCK_SIZE = 3145728
def __init__(self, port=8008, websocket_port=8009, disable_websocket=False,
redis_queue='platypush/http', dashboard={}, resource_dirs={},
ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None,
streaming_chunk_size=_DEFAULT_STREAMING_CHUNK_SIZE,
maps={}, **kwargs):
"""
:param port: Listen port for the web server (default: 8008)
@ -125,12 +138,17 @@ class HttpBackend(Backend):
db: "sqlite:////home/blacklight/.local/share/platypush/feeds/rss.db"
:type dashboard: dict
:param streaming_chunk_size: Size for the chunks of bytes sent over the
media streaming infrastructure (default: 4096 bytes)
:type streaming_chunk_size: int
"""
super().__init__(**kwargs)
self.port = port
self.websocket_port = websocket_port
self.app = None
self.redis_queue = redis_queue
self.dashboard = dashboard
self.maps = maps
@ -148,6 +166,15 @@ class HttpBackend(Backend):
ssl_capath=ssl_capath) \
if ssl_cert else None
self.remote_base_url = '{proto}://{host}:{port}'.format(
proto=('https' if self.ssl_context else 'http'),
host=get_ip_or_hostname(), port=self.port)
self.local_base_url = '{proto}://localhost:{port}'.format(
proto=('https' if self.ssl_context else 'http'), port=self.port)
self._media_map_lock = threading.RLock()
def send_message(self, msg):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -241,8 +268,10 @@ class HttpBackend(Backend):
basedir = os.path.dirname(inspect.getfile(self.__class__))
template_dir = os.path.join(basedir, 'templates')
media_map = {}
app = Flask(__name__, template_folder=template_dir)
self.redis_thread = Thread(target=self.redis_poll)
self.redis_thread = threading.Thread(target=self.redis_poll)
self.redis_thread.start()
@app.route('/execute', methods=['POST'])
@ -335,6 +364,159 @@ class HttpBackend(Backend):
return send_from_directory(real_path, file_path)
def get_media_url(media_id):
return '{url}/media/{media_id}'.format(
url=self.remote_base_url, media_id=media_id)
def get_media_id(source):
return hashlib.sha1(source.encode()).hexdigest()
def register_media(source):
media_id = get_media_id(source)
media_url = get_media_url(media_id)
with self._media_map_lock:
if media_id in media_map:
raise FileExistsError('"{}" is already registered on {}'.
format(source, media_map[media_id].url))
media_hndl = MediaHandler.build(source, url=media_url)
media_map[media_id] = media_hndl
self.logger.info('Streaming "{}" on {}'.format(source, media_url))
return media_hndl
def unregister_media(source):
if source is None:
raise KeyError('No media_id specified')
media_id = get_media_id(source)
media_info = {}
with self._media_map_lock:
if media_id not in media_map:
raise FileNotFoundError('{} is not a registered media_id'.
format(source))
media_info = media_map.pop(media_id)
self.logger.info('Unregistered {} from {}'.format(
source, media_info.get('url')))
return media_info
def stream_media(media_id, request):
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
from_bytes = None
to_bytes = None
range_hdr = request.headers.get('range')
content_length = media_hndl.content_length
status_code = 200
headers = {
'Accept-Ranges': 'bytes',
'Content-Type': media_hndl.mime_type,
}
if 'download' in request.args:
headers['Content-Disposition'] = 'attachment' + \
('; filename="{}"'.format(media_hndl.filename) if
media_hndl.filename else '')
if range_hdr:
headers['Accept-Ranges'] = 'bytes'
from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-')
from_bytes = int(from_bytes)
if not to_bytes:
to_bytes = content_length-1
# to_bytes = from_bytes + self._DEFAULT_STREAMING_BLOCK_SIZE
content_length -= from_bytes
else:
to_bytes = int(to_bytes)
content_length = to_bytes - from_bytes
status_code = 206
headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format(
start=from_bytes, end=to_bytes,
size=media_hndl.content_length)
else:
from_bytes = 0
to_bytes = self._DEFAULT_STREAMING_BLOCK_SIZE
headers['Content-Length'] = content_length
return Response(media_hndl.get_data(
from_bytes=from_bytes, to_bytes=to_bytes,
chunk_size=self._DEFAULT_STREAMING_CHUNK_SIZE),
status_code, headers=headers, mimetype=headers['Content-Type'],
direct_passthrough=True)
@app.route('/media', methods=['GET', 'PUT'])
def add_or_get_media():
"""
This route can be used by the `media` plugin to add streaming
content over HTTP or to get the list of registered streams
"""
if http_request.method == 'GET':
return jsonify([dict(media) for media in media_map.values()])
args = {}
try:
args = json.loads(http_request.data.decode('utf-8'))
except:
abort(400, 'Invalid JSON request')
source = args.get('source')
if not source:
abort(400, 'The request does not contain any source')
try:
media_hndl = register_media(source)
return jsonify(dict(media_hndl))
except FileExistsError as e:
abort(409, str(e))
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except Exception as e:
self.logger.exception(e)
abort(500, str(e))
@app.route('/media/<media_id>', methods=['GET', 'DELETE'])
def stream_or_delete_media(media_id):
"""
This route can be used to stream active media points or unregister
a mounted media stream
"""
# Remove the extension
media_id = '.'.join(media_id.split('.')[:-1])
try:
if http_request.method == 'GET':
if media_id is None:
return jsonify(media_map)
else:
return stream_media(media_id, http_request)
else:
media_info = unregister_media(media_id)
return jsonify(media_info)
except (AttributeError, FileNotFoundError) as e:
abort(404, str(e))
except KeyError as e:
abort(400, str(e))
except Exception as e:
self.logger.exception(e)
abort(500, str(e))
@app.route('/dashboard', methods=['GET'])
def dashboard():
""" Route for the fullscreen dashboard """
@ -448,7 +630,6 @@ class HttpBackend(Backend):
**websocket_args))
loop.run_forever()
def run(self):
super().run()
os.putenv('FLASK_APP', 'platypush')
@ -462,14 +643,14 @@ class HttpBackend(Backend):
self.logger.info('Initialized HTTP backend on port {}'.format(self.port))
webserver = self.webserver()
self.server_proc = Process(target=webserver.run,
self.app = self.webserver()
self.server_proc = Process(target=self.app.run,
name='WebServer',
kwargs=kwargs)
self.server_proc.start()
if not self.disable_websocket:
self.websocket_thread = Thread(target=self.websocket)
self.websocket_thread = threading.Thread(target=self.websocket)
self.websocket_thread.start()
self.server_proc.join()

View File

View File

@ -0,0 +1,61 @@
class MediaHandler:
"""
Abstract class to manage media handlers that can be streamed over the HTTP
server through the `/media` endpoint.
"""
prefix_handlers = []
def __init__(self, source, filename=None,
mime_type='application/octet-stream', name=None, url=None):
matched_handlers = [hndl for hndl in self.prefix_handlers
if source.startswith(hndl)]
if not matched_handlers:
raise AttributeError(('No matched handlers found for source "{}" ' +
'through {}. Supported handlers: {}').format(
source, self.__class__.__name__,
self.prefix_handlers))
self.name = name
self.filename = name
self.source = source
self.url = url
self.mime_type = mime_type
self.content_length = 0
self._matched_handler = matched_handlers[0]
@classmethod
def build(cls, source, *args, **kwargs):
errors = {}
for hndl_class in supported_handlers:
try:
return hndl_class(source, *args, **kwargs)
except Exception as e:
errors[hndl_class.__name__] = str(e)
raise AttributeError(('The source {} has no handlers associated. ' +
'Errors: {}').format(source, errors))
def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None):
raise NotImplementedError()
def __iter__(self):
for attr in ['name', 'source', 'mime_type', 'url', 'prefix_handlers']:
yield (attr, getattr(self, attr))
from .file import FileHandler
__all__ = ['MediaHandler', 'FileHandler']
supported_handlers = [eval(hndl) for hndl in __all__
if hndl != MediaHandler.__name__]
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,49 @@
import functools
import mimetypes
import os
from platypush.utils import get_mime_type
from . import MediaHandler
class FileHandler(MediaHandler):
prefix_handlers = ['file://']
def __init__(self, source, *args, **kwargs):
super().__init__(source, *args, **kwargs)
self.path = os.path.abspath(os.path.expanduser(
self.source[len(self._matched_handler):]))
self.filename = self.path.split('/')[-1]
if not os.path.isfile(self.path):
raise FileNotFoundError('{} is not a valid file'.
format(self.path))
self.mime_type = get_mime_type(source)
if self.mime_type[:5] not in ['video', 'audio', 'image']:
raise AttributeError('{} is not a valid media file'.format(source))
self.extension = mimetypes.guess_extension(self.mime_type)
if self.url:
self.url += self.extension
self.content_length = os.path.getsize(self.path)
def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None):
if from_bytes is None:
from_bytes = 0
if to_bytes is None:
to_bytes = os.path.getsize(self.path)
if chunk_size is None:
chunk_size = os.path.getsize(self.path) - from_bytes
with open(self.path, 'rb') as f:
f.seek(from_bytes)
for chunk in iter(functools.partial(
f.read, min(to_bytes-from_bytes, chunk_size)), b''):
yield chunk
# vim:sw=4:ts=4:et:

View File

@ -8,9 +8,9 @@ import urllib.request
import urllib.parse
from platypush.config import Config
from platypush.context import get_plugin
from platypush.context import get_plugin, get_backend
from platypush.plugins import Plugin, action
from platypush.utils import get_ip_or_hostname, is_process_alive
class PlayerState(enum.Enum):
STOP = 'stop'
@ -25,26 +25,19 @@ class MediaPlugin(Plugin):
Requires:
* A media player installed (supported so far: mplayer, omxplayer, chromecast)
* **python-libtorrent** (``pip install python-libtorrent``), optional for Torrent support
* The :class:`platypush.plugins.media.webtorrent` plugin for optional torrent support through webtorrent (recommented)
* **python-libtorrent** (``pip install python-libtorrent``), optional, for torrent support through the native Python plugin
* **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support
* **requests** (``pip install requests``), optional, for local files over HTTP streaming supporting
To start the local media stream service over HTTP:
* **nodejs** installed on your system
* **express** module (``npm install express``)
* **mime-types** module (``npm install mime-types``)
To start the local media stream service over HTTP you will also need the
:class:`platypush.backend.http.HttpBackend` backend enabled.
"""
# A media plugin can either be local or remote (e.g. control media on
# another device)
_is_local = True
# Default port for the local resources HTTP streaming service
_default_streaming_port = 8989
# setup.py install will place localstream in PATH
_local_stream_bin = 'localstream'
_NOT_IMPLEMENTED_ERR = NotImplementedError(
'This method must be implemented in a derived class')
@ -71,7 +64,7 @@ class MediaPlugin(Plugin):
'media.chromecast'}
def __init__(self, media_dirs=[], download_dir=None, env=None,
streaming_port=_default_streaming_port, *args, **kwargs):
*args, **kwargs):
"""
:param media_dirs: Directories that will be scanned for media files when
a search is performed (default: none)
@ -84,10 +77,6 @@ class MediaPlugin(Plugin):
:param env: Environment variables key-values to pass to the
player executable (e.g. DISPLAY, XDG_VTNR, PULSE_SINK etc.)
:type env: dict
:param streaming_port: Port to be used for streaming local resources
over HTTP (default: 8989)
:type streaming_port: int
"""
super().__init__(*args, **kwargs)
@ -139,10 +128,6 @@ class MediaPlugin(Plugin):
self.media_dirs.add(self.download_dir)
self._videos_queue = []
self._streaming_port = streaming_port
self._streaming_proc = None
self._streaming_started = threading.Event()
self._streaming_ended = threading.Event()
def _get_resource(self, resource):
"""
@ -354,78 +339,62 @@ class MediaPlugin(Plugin):
@action
def start_streaming(self, media, port=None):
def start_streaming(self, media):
"""
Starts streaming local media over the specified HTTP port.
The stream will be available to HTTP clients on
`http://{this-ip}:{port}/media
`http://{this-ip}:{http_backend_port}/media/<media_id>`
:param media: Media to stream
:type media: str
:returns: dict containing the streaming URL.Example::
{
"id": "0123456abcdef.mp4",
"source": "file:///mnt/media/movies/movie.mp4",
"mime_type": "video/mp4",
"url": "http://192.168.1.2:8008/media/0123456abcdef.mp4"
}
"""
if self._streaming_proc:
self.logger.info('A streaming process is already running, ' +
'terminating it first')
self.stop_streaming()
import requests
if port is None:
port = self._streaming_port
self._streaming_started.clear()
self._streaming_ended.clear()
self._streaming_proc = subprocess.Popen(
[self._local_stream_bin, media, str(port)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
threading.Thread(target=self._streaming_process_monitor(media)).start()
url = 'http://{}:{}/media'.format(get_ip_or_hostname(),
self._streaming_port)
self.logger.info('Starting streaming {} on {}'.format(media, url))
self._streaming_started.wait()
self.logger.info('Started streaming {} on {}'.format(media, url))
return { 'url': url }
@action
def stop_streaming(self):
if not self._streaming_proc:
self.logger.info('No streaming process found')
http = get_backend('http')
if not http:
self.logger.warning('Unable to stream {}: HTTP backend unavailable'.
format(media))
return
self._streaming_proc.terminate()
self._streaming_proc.wait()
try: self._streaming_proc.kill()
except: pass
self._streaming_proc = None
self.logger.info('Starting streaming {}'.format(media))
response = requests.put('{url}/media'.format(url=http.local_base_url),
json = { 'source': media })
if not response.ok:
self.logger.warning('Unable to start streaming: {}'.
format(response.text or response.reason))
return
def _streaming_process_monitor(self, media):
def _thread():
if not self._streaming_proc:
return
return response.json()
while True:
if not self._streaming_proc or not \
is_process_alive(self._streaming_proc.pid):
break
@action
def stop_streaming(self, media_id):
import requests
line = self._streaming_proc.stdout.readline().decode().strip()
if not line:
continue
http = get_backend('http')
if not http:
self.logger.warning('Cannot unregister {}: HTTP backend unavailable'.
format(media_id))
return
if line.startswith('Listening on'):
self._streaming_started.set()
break
response = requests.delete('{url}/media/{id}'.
format(url=http.local_base_url, id=media_id))
self.logger.info('Message from streaming service: {}'.format(line))
if not response.ok:
self.logger.warning('Unable to unregister media_id {}: {}'.format(
media_id, response.reason))
return
self._streaming_proc.wait()
try: self.stop_streaming()
except: pass
self._streaming_ended.set()
self.logger.info('Streaming service terminated')
return _thread
return response.json()
def _youtube_search_api(self, query):

View File

@ -1,14 +0,0 @@
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
if [ -d "$HOME/node_modules" ]; then
export NODE_PATH=$HOME/node_modules:$NODE_PATH
fi
file=$1
port=
[ ! -z "$2" ] && port=$2
node $DIR/localstream.js "$file" $port

View File

@ -1,71 +0,0 @@
#!/usr/bin/env node
// Requires:
// - express (`npm install express`)
// - mime-types (`npm install mime-types`)
const express = require('express')
const fs = require('fs')
const path = require('path')
const process = require('process')
const mime = require('mime-types')
const app = express()
function parseArgv() {
let file = undefined
let port = 8989
if (process.argv.length < 3) {
throw Error(`Usage: ${process.argv[0]} ${process.argv[1]} <media_file> [port=${port}]`)
}
file = process.argv[2]
if (process.argv.length > 3) {
port = parseInt(process.argv[3])
}
return { file: file, port: port }
}
let args = parseArgv()
app.get('/media', function(req, res) {
const path = args.file
const ext = args.file.split('.').pop()
const stat = fs.statSync(path)
const fileSize = stat.size
const range = req.headers.range
const mimeType = mime.lookup(ext)
if (range) {
const parts = range.replace(/bytes=/, "").split("-")
const start = parseInt(parts[0], 10)
const end = parts[1]
? parseInt(parts[1], 10)
: fileSize-1
const chunksize = (end-start)+1
const file = fs.createReadStream(path, {start, end})
const head = {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunksize,
'Content-Type': mimeType,
}
res.writeHead(206, head)
file.pipe(res)
} else {
const head = {
'Content-Length': fileSize,
'Content-Type': mimeType,
}
res.writeHead(200, head)
fs.createReadStream(path).pipe(res)
}
})
app.listen(args.port, function () {
console.log(`Listening on port ${args.port}`)
})

View File

@ -1,5 +1,4 @@
import datetime
import os
import re
import pychromecast
@ -181,9 +180,6 @@ class MediaChromecastPlugin(MediaPlugin):
player='chromecast',
**player_args)
if resource.startswith('file://'):
resource = resource[len('file://'):]
if not content_type:
content_type = get_mime_type(resource)
@ -191,8 +187,10 @@ class MediaChromecastPlugin(MediaPlugin):
raise RuntimeError('content_type required to process media {}'.
format(resource))
if os.path.isfile(resource):
if not resource.startswith('http://') and \
not resource.startswith('https://'):
resource = self.start_streaming(resource).output['url']
self.logger.info('HTTP media stream started on {}'.format(resource))
self.logger.info('Playing {} on {}'.format(resource, chromecast))

View File

@ -49,8 +49,7 @@ setup(
'platydock=platypush.platydock:main',
],
},
scripts = ['bin/platyvenv', 'platypush/plugins/media/bin/localstream.js',
'platypush/plugins/media/bin/localstream'],
scripts = ['bin/platyvenv'],
# data_files = [
# ('/etc/platypush', ['platypush/config.example.yaml'])
# ],