From 05b0a7f14d3603594e3ee45c0c2fd9b9c714f2d6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 29 Sep 2020 15:31:27 +0200 Subject: [PATCH] Added GStreamer media plugin [closes #151] --- .../platypush/plugins/media.gstreamer.rst | 5 + docs/source/plugins.rst | 1 + .../source/webpanel/plugins/media.gstreamer | 1 + .../js/plugins/media.gstreamer/index.js | 5 + .../plugins/media.gstreamer/index.html | 7 + platypush/common/__init__.py | 0 platypush/common/gstreamer/__init__.py | 191 +++++++++++++++ .../plugins/camera/gstreamer/__init__.py | 5 +- platypush/plugins/camera/gstreamer/model.py | 110 +-------- platypush/plugins/media/gstreamer/__init__.py | 221 ++++++++++++++++++ platypush/plugins/media/gstreamer/model.py | 55 +++++ 11 files changed, 490 insertions(+), 111 deletions(-) create mode 100644 docs/source/platypush/plugins/media.gstreamer.rst create mode 120000 platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer create mode 100644 platypush/backend/http/static/js/plugins/media.gstreamer/index.js create mode 100644 platypush/backend/http/templates/plugins/media.gstreamer/index.html create mode 100644 platypush/common/__init__.py create mode 100644 platypush/common/gstreamer/__init__.py create mode 100644 platypush/plugins/media/gstreamer/__init__.py create mode 100644 platypush/plugins/media/gstreamer/model.py diff --git a/docs/source/platypush/plugins/media.gstreamer.rst b/docs/source/platypush/plugins/media.gstreamer.rst new file mode 100644 index 0000000000..0789f056ac --- /dev/null +++ b/docs/source/platypush/plugins/media.gstreamer.rst @@ -0,0 +1,5 @@ +``platypush.plugins.media.gstreamer`` +===================================== + +.. automodule:: platypush.plugins.media.gstreamer + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index cd63a43e5f..84a65c8be3 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -82,6 +82,7 @@ Plugins platypush/plugins/media.rst platypush/plugins/media.chromecast.rst platypush/plugins/media.ctrl.rst + platypush/plugins/media.gstreamer.rst platypush/plugins/media.kodi.rst platypush/plugins/media.mplayer.rst platypush/plugins/media.mpv.rst diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer b/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer new file mode 120000 index 0000000000..27949aaf30 --- /dev/null +++ b/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer @@ -0,0 +1 @@ +media \ No newline at end of file diff --git a/platypush/backend/http/static/js/plugins/media.gstreamer/index.js b/platypush/backend/http/static/js/plugins/media.gstreamer/index.js new file mode 100644 index 0000000000..67682dc333 --- /dev/null +++ b/platypush/backend/http/static/js/plugins/media.gstreamer/index.js @@ -0,0 +1,5 @@ +Vue.component('media-gstreamer', { + template: '#tmpl-media-gstreamer', + props: ['config'], +}); + diff --git a/platypush/backend/http/templates/plugins/media.gstreamer/index.html b/platypush/backend/http/templates/plugins/media.gstreamer/index.html new file mode 100644 index 0000000000..dc35992f57 --- /dev/null +++ b/platypush/backend/http/templates/plugins/media.gstreamer/index.html @@ -0,0 +1,7 @@ + +{% include 'plugins/media/index.html' %} + + + diff --git a/platypush/common/__init__.py b/platypush/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/platypush/common/gstreamer/__init__.py b/platypush/common/gstreamer/__init__.py new file mode 100644 index 0000000000..39f4914cbc --- /dev/null +++ b/platypush/common/gstreamer/__init__.py @@ -0,0 +1,191 @@ +import logging +import threading + +from typing import Optional + +# noinspection PyPackageRequirements +import gi +gi.require_version('Gst', '1.0') +gi.require_version('GstApp', '1.0') + +# noinspection PyPackageRequirements,PyUnresolvedReferences +from gi.repository import GLib, Gst, GstApp + +Gst.init(None) + + +class Pipeline: + def __init__(self): + self.logger = logging.getLogger('gst-pipeline') + self.pipeline = Gst.Pipeline() + self.pipeline.set_state(Gst.State.NULL) + self.loop = Loop() + self.source = None + self.sink = None + + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect('message::eos', self.on_eos) + self.bus.connect('message::error', self.on_error) + self.data_ready = threading.Event() + self.data = None + + def add(self, element_name: str, *args, **props): + el = Gst.ElementFactory.make(element_name, *args) + for k, v in props.items(): + if k == 'caps': + v = Gst.caps_from_string(v) + el.set_property(k, v) + + self.pipeline.add(el) + return el + + def add_source(self, element_name: str, *args, **props): + assert not self.source, 'A source element is already set for this pipeline' + source = self.add(element_name, *args, **props) + self.source = source + return source + + def add_sink(self, element_name: str, *args, **props): + assert not self.sink, 'A sink element is already set for this pipeline' + sink = self.add(element_name, *args, **props) + sink.connect('new-sample', self.on_buffer) + sink.set_property('emit-signals', True) + self.sink = sink + return sink + + @staticmethod + def link(*elements): + for i, el in enumerate(elements): + if i == len(elements)-1: + break + el.link(elements[i+1]) + + def emit(self, signal, *args, **kwargs): + return self.pipeline.emit(signal, *args, **kwargs) + + def play(self): + self.pipeline.set_state(Gst.State.PLAYING) + self.loop.start() + + def pause(self): + state = self.get_state() + if state == Gst.State.PAUSED: + self.pipeline.set_state(Gst.State.PLAYING) + else: + self.pipeline.set_state(Gst.State.PAUSED) + + def stop(self): + self.pipeline.set_state(Gst.State.NULL) + self.loop.stop() + self.loop = None + + def get_volume(self) -> float: + assert self.source, 'No source initialized' + return self.source.get_property('volume') or 0 + + def set_volume(self, volume: float): + assert self.source, 'No source initialized' + self.source.set_property('volume', volume) + + def on_buffer(self, sink): + sample = GstApp.AppSink.pull_sample(sink) + buffer = sample.get_buffer() + size, offset, maxsize = buffer.get_sizes() + self.data = buffer.extract_dup(offset, size) + self.data_ready.set() + return False + + def on_eos(self, *_, **__): + self.logger.info('End of stream event received') + self.stop() + + # noinspection PyUnusedLocal + def on_error(self, bus, msg): + self.logger.warning('GStreamer pipeline error: {}'.format(msg.parse_error())) + self.stop() + + def get_source(self): + return self.source + + def get_sink(self): + return self.sink + + def get_state(self) -> Gst.State: + state = self.source.current_state + if not state: + self.logger.warning('Unable to get pipeline state') + return Gst.State.NULL + + return state + + def is_playing(self) -> bool: + return self.get_state() == Gst.State.PLAYING + + def is_paused(self) -> bool: + return self.get_state() == Gst.State.PAUSED + + def get_position(self) -> Optional[float]: + pos = self.source.query_position(Gst.Format(Gst.Format.TIME)) + if not pos[0]: + return None + + return pos[1] / 1e9 + + def get_duration(self) -> Optional[float]: + pos = self.source.query_duration(Gst.Format(Gst.Format.TIME)) + if not pos[0]: + return None + + return pos[1] / 1e9 + + def is_muted(self) -> bool: + if not self.source: + return False + return self.source.get_property('mute') + + def set_mute(self, mute: bool): + assert self.source, 'No source specified' + self.source.set_property('mute', mute) + + def mute(self): + self.set_mute(True) + + def unmute(self): + self.set_mute(False) + + def seek(self, position: float): + assert self.source, 'No source specified' + if position < 0: + position = 0 + + duration = self.get_duration() + if duration and position > duration: + position = duration + + seek_ns = int(position * 1e9) + self.source.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, seek_ns) + + +class Loop(threading.Thread): + def __init__(self): + super().__init__() + self._loop = GLib.MainLoop() + + def run(self): + self._loop.run() + + def is_running(self) -> bool: + return self.is_alive() or self._loop is not None + + def stop(self): + if not self.is_running(): + return + if self._loop: + self._loop.quit() + if threading.get_ident() != self.ident: + self.join() + self._loop = None + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/camera/gstreamer/__init__.py b/platypush/plugins/camera/gstreamer/__init__.py index 7ef6bf62a3..50df0d8e12 100644 --- a/platypush/plugins/camera/gstreamer/__init__.py +++ b/platypush/plugins/camera/gstreamer/__init__.py @@ -4,7 +4,8 @@ from PIL import Image from PIL.Image import Image as ImageType from platypush.plugins.camera import CameraPlugin -from platypush.plugins.camera.gstreamer.model import GStreamerCamera, Pipeline, Loop +from platypush.plugins.camera.gstreamer.model import GStreamerCamera +from platypush.common.gstreamer import Pipeline class CameraGstreamerPlugin(CameraPlugin): @@ -28,7 +29,7 @@ class CameraGstreamerPlugin(CameraPlugin): def prepare_device(self, camera: GStreamerCamera) -> Pipeline: pipeline = Pipeline() - src = pipeline.add('v4l2src', device=camera.info.device) + src = pipeline.add_source('v4l2src', device=camera.info.device) convert = pipeline.add('videoconvert') video_filter = pipeline.add( 'capsfilter', caps='video/x-raw,format=RGB,width={width},height={height},framerate={fps}/1'.format( diff --git a/platypush/plugins/camera/gstreamer/model.py b/platypush/plugins/camera/gstreamer/model.py index 08572479d2..48082c6222 100644 --- a/platypush/plugins/camera/gstreamer/model.py +++ b/platypush/plugins/camera/gstreamer/model.py @@ -1,114 +1,6 @@ -import logging -import threading - +from platypush.common.gstreamer import Pipeline from platypush.plugins.camera import CameraInfo, Camera -# noinspection PyPackageRequirements -import gi -gi.require_version('Gst', '1.0') -gi.require_version('GstApp', '1.0') - -# noinspection PyPackageRequirements,PyUnresolvedReferences -from gi.repository import GLib, Gst, GstApp - -Gst.init(None) - - -class Pipeline: - def __init__(self): - self.pipeline = Gst.Pipeline() - self.logger = logging.getLogger('gst-pipeline') - self.loop = Loop() - self.sink = None - - self.bus = self.pipeline.get_bus() - self.bus.add_signal_watch() - self.bus.connect('message::eos', self.on_eos) - self.bus.connect('message::error', self.on_error) - self.data_ready = threading.Event() - self.data = None - - def add(self, element_name: str, *args, **props): - el = Gst.ElementFactory.make(element_name, *args) - for k, v in props.items(): - if k == 'caps': - v = Gst.caps_from_string(v) - el.set_property(k, v) - - self.pipeline.add(el) - return el - - def add_sink(self, element_name: str, *args, **props): - assert not self.sink, 'A sink element is already set for this pipeline' - sink = self.add(element_name, *args, **props) - sink.connect('new-sample', self.on_buffer) - sink.set_property('emit-signals', True) - self.sink = sink - return sink - - @staticmethod - def link(*elements): - for i, el in enumerate(elements): - if i == len(elements)-1: - break - el.link(elements[i+1]) - - def emit(self, signal, *args, **kwargs): - return self.pipeline.emit(signal, *args, **kwargs) - - def play(self): - assert self.sink, 'No sink element specified through add_sink()' - self.pipeline.set_state(Gst.State.PLAYING) - self.loop.start() - - def pause(self): - self.pipeline.set_state(Gst.State.PAUSED) - - def stop(self): - self.pipeline.set_state(Gst.State.NULL) - self.loop.stop() - - def on_buffer(self, sink): - sample = GstApp.AppSink.pull_sample(sink) - buffer = sample.get_buffer() - size, offset, maxsize = buffer.get_sizes() - self.data = buffer.extract_dup(offset, size) - self.data_ready.set() - return False - - def on_eos(self, *_, **__): - self.logger.info('End of stream event received') - self.stop() - - # noinspection PyUnusedLocal - def on_error(self, bus, msg): - self.logger.warning('GStreamer pipeline error: {}'.format(msg.parse_error())) - self.stop() - - def get_sink(self): - return self.sink - - -class Loop(threading.Thread): - def __init__(self): - super().__init__() - self._loop = GLib.MainLoop() - - def run(self): - self._loop.run() - - def is_running(self) -> bool: - return self.is_alive() or self._loop is not None - - def stop(self): - if not self.is_running(): - return - if self._loop: - self._loop.quit() - if threading.get_ident() != self.ident: - self.join() - self._loop = None - class GStreamerCamera(Camera): info: CameraInfo diff --git a/platypush/plugins/media/gstreamer/__init__.py b/platypush/plugins/media/gstreamer/__init__.py new file mode 100644 index 0000000000..aee692b4ee --- /dev/null +++ b/platypush/plugins/media/gstreamer/__init__.py @@ -0,0 +1,221 @@ +import os +from typing import Optional + +from platypush.plugins.media import PlayerState, MediaPlugin +from platypush.message.event.media import MediaPlayRequestEvent, MediaVolumeChangedEvent + +from platypush.plugins import action +from platypush.plugins.media.gstreamer.model import MediaPipeline + + +class MediaGstreamerPlugin(MediaPlugin): + """ + Plugin to play media over GStreamer. + + Requires: + + * **gst-python** (``pip install gst-python``) + + """ + + def __init__(self, sink: Optional[str] = None, *args, **kwargs): + """ + :param sink: GStreamer audio sink (default: ``None``, automatic). + """ + super().__init__(*args, **kwargs) + self.sink = sink + self._player: Optional[MediaPipeline] = None + self._resource: Optional[str] = None + + def _allocate_pipeline(self, resource: str) -> MediaPipeline: + pipeline = MediaPipeline(resource) + if self.sink: + sink = pipeline.add_sink(self.sink, sync=False) + pipeline.link(pipeline.get_source(), sink) + + self._player = pipeline + self._resource = resource + return pipeline + + @action + def play(self, resource: Optional[str] = None, **args): + """ + Play a resource. + + :param resource: Resource to play - can be a local file or a remote URL + """ + + if not resource: + if self._player: + self._player.play() + return + + resource = self._get_resource(resource) + path = os.path.abspath(os.path.expanduser(resource)) + if os.path.exists(path): + resource = 'file://' + path + + MediaPipeline.post_event(MediaPlayRequestEvent, resource=resource) + pipeline = self._allocate_pipeline(resource) + pipeline.play() + if self.volume: + pipeline.set_volume(self.volume / 100.) + + return self.status() + + @action + def pause(self): + """ Toggle the paused state """ + assert self._player, 'No instance is running' + self._player.pause() + return self.status() + + @action + def quit(self): + """ Stop and quit the player (alias for :meth:`.stop`) """ + self._stop_torrent() + assert self._player, 'No instance is running' + + self._player.stop() + self._player = None + return {'state': PlayerState.STOP.value} + + @action + def stop(self): + """ Stop and quit the player (alias for :meth:`.quit`) """ + return self.quit() + + @action + def voldown(self, step=10.0): + """ Volume down by (default: 10)% """ + # noinspection PyUnresolvedReferences + return self.set_volume(self.get_volume().output - step) + + @action + def volup(self, step=10.0): + """ Volume up by (default: 10)% """ + # noinspection PyUnresolvedReferences + return self.set_volume(self.get_volume().output + step) + + @action + def get_volume(self) -> float: + """ + Get the volume. + + :return: Volume value between 0 and 100. + """ + assert self._player, 'No instance is running' + return self._player.get_volume() * 100. + + @action + def set_volume(self, volume): + """ + Set the volume. + + :param volume: Volume value between 0 and 100. + """ + assert self._player, 'Player not running' + # noinspection PyTypeChecker + volume = max(0, min(1, volume / 100.)) + self._player.set_volume(volume) + MediaPipeline.post_event(MediaVolumeChangedEvent, volume=volume * 100) + return self.status() + + @action + def seek(self, position: float) -> dict: + """ + Seek backward/forward by the specified number of seconds. + + :param position: Number of seconds relative to the current cursor. + """ + assert self._player, 'No instance is running' + cur_pos = self._player.get_position() + return self.set_position(cur_pos + position) + + @action + def back(self, offset=60.0): + """ Back by (default: 60) seconds """ + return self.seek(-offset) + + @action + def forward(self, offset=60.0): + """ Forward by (default: 60) seconds """ + return self.seek(offset) + + @action + def is_playing(self): + """ + :returns: True if it's playing, False otherwise + """ + return self._player and self._player.is_playing() + + @action + def load(self, resource, **args): + """ + Load/queue a resource/video to the player (alias for :meth:`.play`). + """ + return self.play(resource) + + @action + def mute(self): + """ Toggle mute state """ + assert self._player, 'No instance is running' + muted = self._player.is_muted() + if muted: + self._player.unmute() + else: + self._player.mute() + + return {'muted': self._player.is_muted()} + + @action + def set_position(self, position: float) -> dict: + """ + Seek backward/forward to the specified absolute position. + + :param position: Stream position in seconds. + :return: Player state. + """ + assert self._player, 'No instance is running' + self._player.seek(position) + return self.status() + + @action + def status(self) -> dict: + """ + Get the current player state. + """ + if not self._player: + return {'state': PlayerState.STOP.value} + + pos = self._player.get_position() + length = self._player.get_duration() + + return { + 'duration': length, + 'filename': self._resource[7:] if self._resource.startswith('file://') else self._resource, + 'mute': self._player.is_muted(), + 'name': self._resource, + 'pause': self._player.is_paused(), + 'percent_pos': pos / length if pos is not None and length is not None and pos >= 0 and length > 0 else 0, + 'position': pos, + 'seekable': length is not None and length > 0, + 'state': self._gst_to_player_state(self._player.get_state()).value, + 'url': self._resource, + 'volume': self._player.get_volume() * 100, + } + + @staticmethod + def _gst_to_player_state(state) -> PlayerState: + # noinspection PyUnresolvedReferences,PyPackageRequirements + from gi.repository import Gst + if state == Gst.State.READY: + return PlayerState.STOP + if state == Gst.State.PAUSED: + return PlayerState.PAUSE + if state == Gst.State.PLAYING: + return PlayerState.PLAY + return PlayerState.IDLE + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/gstreamer/model.py b/platypush/plugins/media/gstreamer/model.py new file mode 100644 index 0000000000..9c1341e6c6 --- /dev/null +++ b/platypush/plugins/media/gstreamer/model.py @@ -0,0 +1,55 @@ +from typing import Type + +from platypush.common.gstreamer import Pipeline +from platypush.context import get_bus +from platypush.message.event.media import MediaEvent, MediaPlayEvent, MediaPauseEvent, MediaStopEvent, \ + NewPlayingMediaEvent, MediaMuteChangedEvent, MediaSeekEvent + + +class MediaPipeline(Pipeline): + def __init__(self, resource: str): + super().__init__() + self.resource = resource + self.add_source('playbin', uri=resource) + + @staticmethod + def post_event(evt_class: Type[MediaEvent], **kwargs): + kwargs['player'] = 'local' + kwargs['plugin'] = 'media.gstreamer' + evt = evt_class(**kwargs) + get_bus().post(evt) + + def play(self): + # noinspection PyUnresolvedReferences,PyPackageRequirements + from gi.repository import Gst + is_first_play = self.get_state() == Gst.State.NULL + + super().play() + if is_first_play: + self.post_event(NewPlayingMediaEvent, resource=self.resource) + self.post_event(MediaPlayEvent, resource=self.resource) + + def pause(self): + # noinspection PyUnresolvedReferences,PyPackageRequirements + from gi.repository import Gst + super().pause() + self.post_event(MediaPauseEvent if self.get_state() == Gst.State.PAUSED else MediaPlayEvent) + + def stop(self): + super().stop() + self.post_event(MediaStopEvent) + + def mute(self): + super().mute() + self.post_event(MediaMuteChangedEvent, mute=self.is_muted()) + + def unmute(self): + super().unmute() + self.post_event(MediaMuteChangedEvent, mute=self.is_muted()) + + def seek(self, position: float): + super().seek(position) + self.post_event(MediaSeekEvent, position=self.get_position()) + + +# vim:sw=4:ts=4:et: