diff --git a/docs/source/platypush/plugins/media.gstreamer.rst b/docs/source/platypush/plugins/media.gstreamer.rst
new file mode 100644
index 0000000000..0789f056ac
--- /dev/null
+++ b/docs/source/platypush/plugins/media.gstreamer.rst
@@ -0,0 +1,5 @@
+``platypush.plugins.media.gstreamer``
+=====================================
+
+.. automodule:: platypush.plugins.media.gstreamer
+ :members:
diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst
index cd63a43e5f..84a65c8be3 100644
--- a/docs/source/plugins.rst
+++ b/docs/source/plugins.rst
@@ -82,6 +82,7 @@ Plugins
platypush/plugins/media.rst
platypush/plugins/media.chromecast.rst
platypush/plugins/media.ctrl.rst
+ platypush/plugins/media.gstreamer.rst
platypush/plugins/media.kodi.rst
platypush/plugins/media.mplayer.rst
platypush/plugins/media.mpv.rst
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer b/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer
new file mode 120000
index 0000000000..27949aaf30
--- /dev/null
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/media.gstreamer
@@ -0,0 +1 @@
+media
\ No newline at end of file
diff --git a/platypush/backend/http/static/js/plugins/media.gstreamer/index.js b/platypush/backend/http/static/js/plugins/media.gstreamer/index.js
new file mode 100644
index 0000000000..67682dc333
--- /dev/null
+++ b/platypush/backend/http/static/js/plugins/media.gstreamer/index.js
@@ -0,0 +1,5 @@
+Vue.component('media-gstreamer', {
+ template: '#tmpl-media-gstreamer',
+ props: ['config'],
+});
+
diff --git a/platypush/backend/http/templates/plugins/media.gstreamer/index.html b/platypush/backend/http/templates/plugins/media.gstreamer/index.html
new file mode 100644
index 0000000000..dc35992f57
--- /dev/null
+++ b/platypush/backend/http/templates/plugins/media.gstreamer/index.html
@@ -0,0 +1,7 @@
+
+{% include 'plugins/media/index.html' %}
+
+
+
diff --git a/platypush/common/__init__.py b/platypush/common/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/platypush/common/gstreamer/__init__.py b/platypush/common/gstreamer/__init__.py
new file mode 100644
index 0000000000..39f4914cbc
--- /dev/null
+++ b/platypush/common/gstreamer/__init__.py
@@ -0,0 +1,191 @@
+import logging
+import threading
+
+from typing import Optional
+
+# noinspection PyPackageRequirements
+import gi
+gi.require_version('Gst', '1.0')
+gi.require_version('GstApp', '1.0')
+
+# noinspection PyPackageRequirements,PyUnresolvedReferences
+from gi.repository import GLib, Gst, GstApp
+
+Gst.init(None)
+
+
+class Pipeline:
+ def __init__(self):
+ self.logger = logging.getLogger('gst-pipeline')
+ self.pipeline = Gst.Pipeline()
+ self.pipeline.set_state(Gst.State.NULL)
+ self.loop = Loop()
+ self.source = None
+ self.sink = None
+
+ self.bus = self.pipeline.get_bus()
+ self.bus.add_signal_watch()
+ self.bus.connect('message::eos', self.on_eos)
+ self.bus.connect('message::error', self.on_error)
+ self.data_ready = threading.Event()
+ self.data = None
+
+ def add(self, element_name: str, *args, **props):
+ el = Gst.ElementFactory.make(element_name, *args)
+ for k, v in props.items():
+ if k == 'caps':
+ v = Gst.caps_from_string(v)
+ el.set_property(k, v)
+
+ self.pipeline.add(el)
+ return el
+
+ def add_source(self, element_name: str, *args, **props):
+ assert not self.source, 'A source element is already set for this pipeline'
+ source = self.add(element_name, *args, **props)
+ self.source = source
+ return source
+
+ def add_sink(self, element_name: str, *args, **props):
+ assert not self.sink, 'A sink element is already set for this pipeline'
+ sink = self.add(element_name, *args, **props)
+ sink.connect('new-sample', self.on_buffer)
+ sink.set_property('emit-signals', True)
+ self.sink = sink
+ return sink
+
+ @staticmethod
+ def link(*elements):
+ for i, el in enumerate(elements):
+ if i == len(elements)-1:
+ break
+ el.link(elements[i+1])
+
+ def emit(self, signal, *args, **kwargs):
+ return self.pipeline.emit(signal, *args, **kwargs)
+
+ def play(self):
+ self.pipeline.set_state(Gst.State.PLAYING)
+ self.loop.start()
+
+ def pause(self):
+ state = self.get_state()
+ if state == Gst.State.PAUSED:
+ self.pipeline.set_state(Gst.State.PLAYING)
+ else:
+ self.pipeline.set_state(Gst.State.PAUSED)
+
+ def stop(self):
+ self.pipeline.set_state(Gst.State.NULL)
+ self.loop.stop()
+ self.loop = None
+
+ def get_volume(self) -> float:
+ assert self.source, 'No source initialized'
+ return self.source.get_property('volume') or 0
+
+ def set_volume(self, volume: float):
+ assert self.source, 'No source initialized'
+ self.source.set_property('volume', volume)
+
+ def on_buffer(self, sink):
+ sample = GstApp.AppSink.pull_sample(sink)
+ buffer = sample.get_buffer()
+ size, offset, maxsize = buffer.get_sizes()
+ self.data = buffer.extract_dup(offset, size)
+ self.data_ready.set()
+ return False
+
+ def on_eos(self, *_, **__):
+ self.logger.info('End of stream event received')
+ self.stop()
+
+ # noinspection PyUnusedLocal
+ def on_error(self, bus, msg):
+ self.logger.warning('GStreamer pipeline error: {}'.format(msg.parse_error()))
+ self.stop()
+
+ def get_source(self):
+ return self.source
+
+ def get_sink(self):
+ return self.sink
+
+ def get_state(self) -> Gst.State:
+ state = self.source.current_state
+ if not state:
+ self.logger.warning('Unable to get pipeline state')
+ return Gst.State.NULL
+
+ return state
+
+ def is_playing(self) -> bool:
+ return self.get_state() == Gst.State.PLAYING
+
+ def is_paused(self) -> bool:
+ return self.get_state() == Gst.State.PAUSED
+
+ def get_position(self) -> Optional[float]:
+ pos = self.source.query_position(Gst.Format(Gst.Format.TIME))
+ if not pos[0]:
+ return None
+
+ return pos[1] / 1e9
+
+ def get_duration(self) -> Optional[float]:
+ pos = self.source.query_duration(Gst.Format(Gst.Format.TIME))
+ if not pos[0]:
+ return None
+
+ return pos[1] / 1e9
+
+ def is_muted(self) -> bool:
+ if not self.source:
+ return False
+ return self.source.get_property('mute')
+
+ def set_mute(self, mute: bool):
+ assert self.source, 'No source specified'
+ self.source.set_property('mute', mute)
+
+ def mute(self):
+ self.set_mute(True)
+
+ def unmute(self):
+ self.set_mute(False)
+
+ def seek(self, position: float):
+ assert self.source, 'No source specified'
+ if position < 0:
+ position = 0
+
+ duration = self.get_duration()
+ if duration and position > duration:
+ position = duration
+
+ seek_ns = int(position * 1e9)
+ self.source.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, seek_ns)
+
+
+class Loop(threading.Thread):
+ def __init__(self):
+ super().__init__()
+ self._loop = GLib.MainLoop()
+
+ def run(self):
+ self._loop.run()
+
+ def is_running(self) -> bool:
+ return self.is_alive() or self._loop is not None
+
+ def stop(self):
+ if not self.is_running():
+ return
+ if self._loop:
+ self._loop.quit()
+ if threading.get_ident() != self.ident:
+ self.join()
+ self._loop = None
+
+
+# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/camera/gstreamer/__init__.py b/platypush/plugins/camera/gstreamer/__init__.py
index 7ef6bf62a3..50df0d8e12 100644
--- a/platypush/plugins/camera/gstreamer/__init__.py
+++ b/platypush/plugins/camera/gstreamer/__init__.py
@@ -4,7 +4,8 @@ from PIL import Image
from PIL.Image import Image as ImageType
from platypush.plugins.camera import CameraPlugin
-from platypush.plugins.camera.gstreamer.model import GStreamerCamera, Pipeline, Loop
+from platypush.plugins.camera.gstreamer.model import GStreamerCamera
+from platypush.common.gstreamer import Pipeline
class CameraGstreamerPlugin(CameraPlugin):
@@ -28,7 +29,7 @@ class CameraGstreamerPlugin(CameraPlugin):
def prepare_device(self, camera: GStreamerCamera) -> Pipeline:
pipeline = Pipeline()
- src = pipeline.add('v4l2src', device=camera.info.device)
+ src = pipeline.add_source('v4l2src', device=camera.info.device)
convert = pipeline.add('videoconvert')
video_filter = pipeline.add(
'capsfilter', caps='video/x-raw,format=RGB,width={width},height={height},framerate={fps}/1'.format(
diff --git a/platypush/plugins/camera/gstreamer/model.py b/platypush/plugins/camera/gstreamer/model.py
index 08572479d2..48082c6222 100644
--- a/platypush/plugins/camera/gstreamer/model.py
+++ b/platypush/plugins/camera/gstreamer/model.py
@@ -1,114 +1,6 @@
-import logging
-import threading
-
+from platypush.common.gstreamer import Pipeline
from platypush.plugins.camera import CameraInfo, Camera
-# noinspection PyPackageRequirements
-import gi
-gi.require_version('Gst', '1.0')
-gi.require_version('GstApp', '1.0')
-
-# noinspection PyPackageRequirements,PyUnresolvedReferences
-from gi.repository import GLib, Gst, GstApp
-
-Gst.init(None)
-
-
-class Pipeline:
- def __init__(self):
- self.pipeline = Gst.Pipeline()
- self.logger = logging.getLogger('gst-pipeline')
- self.loop = Loop()
- self.sink = None
-
- self.bus = self.pipeline.get_bus()
- self.bus.add_signal_watch()
- self.bus.connect('message::eos', self.on_eos)
- self.bus.connect('message::error', self.on_error)
- self.data_ready = threading.Event()
- self.data = None
-
- def add(self, element_name: str, *args, **props):
- el = Gst.ElementFactory.make(element_name, *args)
- for k, v in props.items():
- if k == 'caps':
- v = Gst.caps_from_string(v)
- el.set_property(k, v)
-
- self.pipeline.add(el)
- return el
-
- def add_sink(self, element_name: str, *args, **props):
- assert not self.sink, 'A sink element is already set for this pipeline'
- sink = self.add(element_name, *args, **props)
- sink.connect('new-sample', self.on_buffer)
- sink.set_property('emit-signals', True)
- self.sink = sink
- return sink
-
- @staticmethod
- def link(*elements):
- for i, el in enumerate(elements):
- if i == len(elements)-1:
- break
- el.link(elements[i+1])
-
- def emit(self, signal, *args, **kwargs):
- return self.pipeline.emit(signal, *args, **kwargs)
-
- def play(self):
- assert self.sink, 'No sink element specified through add_sink()'
- self.pipeline.set_state(Gst.State.PLAYING)
- self.loop.start()
-
- def pause(self):
- self.pipeline.set_state(Gst.State.PAUSED)
-
- def stop(self):
- self.pipeline.set_state(Gst.State.NULL)
- self.loop.stop()
-
- def on_buffer(self, sink):
- sample = GstApp.AppSink.pull_sample(sink)
- buffer = sample.get_buffer()
- size, offset, maxsize = buffer.get_sizes()
- self.data = buffer.extract_dup(offset, size)
- self.data_ready.set()
- return False
-
- def on_eos(self, *_, **__):
- self.logger.info('End of stream event received')
- self.stop()
-
- # noinspection PyUnusedLocal
- def on_error(self, bus, msg):
- self.logger.warning('GStreamer pipeline error: {}'.format(msg.parse_error()))
- self.stop()
-
- def get_sink(self):
- return self.sink
-
-
-class Loop(threading.Thread):
- def __init__(self):
- super().__init__()
- self._loop = GLib.MainLoop()
-
- def run(self):
- self._loop.run()
-
- def is_running(self) -> bool:
- return self.is_alive() or self._loop is not None
-
- def stop(self):
- if not self.is_running():
- return
- if self._loop:
- self._loop.quit()
- if threading.get_ident() != self.ident:
- self.join()
- self._loop = None
-
class GStreamerCamera(Camera):
info: CameraInfo
diff --git a/platypush/plugins/media/gstreamer/__init__.py b/platypush/plugins/media/gstreamer/__init__.py
new file mode 100644
index 0000000000..aee692b4ee
--- /dev/null
+++ b/platypush/plugins/media/gstreamer/__init__.py
@@ -0,0 +1,221 @@
+import os
+from typing import Optional
+
+from platypush.plugins.media import PlayerState, MediaPlugin
+from platypush.message.event.media import MediaPlayRequestEvent, MediaVolumeChangedEvent
+
+from platypush.plugins import action
+from platypush.plugins.media.gstreamer.model import MediaPipeline
+
+
+class MediaGstreamerPlugin(MediaPlugin):
+ """
+ Plugin to play media over GStreamer.
+
+ Requires:
+
+ * **gst-python** (``pip install gst-python``)
+
+ """
+
+ def __init__(self, sink: Optional[str] = None, *args, **kwargs):
+ """
+ :param sink: GStreamer audio sink (default: ``None``, automatic).
+ """
+ super().__init__(*args, **kwargs)
+ self.sink = sink
+ self._player: Optional[MediaPipeline] = None
+ self._resource: Optional[str] = None
+
+ def _allocate_pipeline(self, resource: str) -> MediaPipeline:
+ pipeline = MediaPipeline(resource)
+ if self.sink:
+ sink = pipeline.add_sink(self.sink, sync=False)
+ pipeline.link(pipeline.get_source(), sink)
+
+ self._player = pipeline
+ self._resource = resource
+ return pipeline
+
+ @action
+ def play(self, resource: Optional[str] = None, **args):
+ """
+ Play a resource.
+
+ :param resource: Resource to play - can be a local file or a remote URL
+ """
+
+ if not resource:
+ if self._player:
+ self._player.play()
+ return
+
+ resource = self._get_resource(resource)
+ path = os.path.abspath(os.path.expanduser(resource))
+ if os.path.exists(path):
+ resource = 'file://' + path
+
+ MediaPipeline.post_event(MediaPlayRequestEvent, resource=resource)
+ pipeline = self._allocate_pipeline(resource)
+ pipeline.play()
+ if self.volume:
+ pipeline.set_volume(self.volume / 100.)
+
+ return self.status()
+
+ @action
+ def pause(self):
+ """ Toggle the paused state """
+ assert self._player, 'No instance is running'
+ self._player.pause()
+ return self.status()
+
+ @action
+ def quit(self):
+ """ Stop and quit the player (alias for :meth:`.stop`) """
+ self._stop_torrent()
+ assert self._player, 'No instance is running'
+
+ self._player.stop()
+ self._player = None
+ return {'state': PlayerState.STOP.value}
+
+ @action
+ def stop(self):
+ """ Stop and quit the player (alias for :meth:`.quit`) """
+ return self.quit()
+
+ @action
+ def voldown(self, step=10.0):
+ """ Volume down by (default: 10)% """
+ # noinspection PyUnresolvedReferences
+ return self.set_volume(self.get_volume().output - step)
+
+ @action
+ def volup(self, step=10.0):
+ """ Volume up by (default: 10)% """
+ # noinspection PyUnresolvedReferences
+ return self.set_volume(self.get_volume().output + step)
+
+ @action
+ def get_volume(self) -> float:
+ """
+ Get the volume.
+
+ :return: Volume value between 0 and 100.
+ """
+ assert self._player, 'No instance is running'
+ return self._player.get_volume() * 100.
+
+ @action
+ def set_volume(self, volume):
+ """
+ Set the volume.
+
+ :param volume: Volume value between 0 and 100.
+ """
+ assert self._player, 'Player not running'
+ # noinspection PyTypeChecker
+ volume = max(0, min(1, volume / 100.))
+ self._player.set_volume(volume)
+ MediaPipeline.post_event(MediaVolumeChangedEvent, volume=volume * 100)
+ return self.status()
+
+ @action
+ def seek(self, position: float) -> dict:
+ """
+ Seek backward/forward by the specified number of seconds.
+
+ :param position: Number of seconds relative to the current cursor.
+ """
+ assert self._player, 'No instance is running'
+ cur_pos = self._player.get_position()
+ return self.set_position(cur_pos + position)
+
+ @action
+ def back(self, offset=60.0):
+ """ Back by (default: 60) seconds """
+ return self.seek(-offset)
+
+ @action
+ def forward(self, offset=60.0):
+ """ Forward by (default: 60) seconds """
+ return self.seek(offset)
+
+ @action
+ def is_playing(self):
+ """
+ :returns: True if it's playing, False otherwise
+ """
+ return self._player and self._player.is_playing()
+
+ @action
+ def load(self, resource, **args):
+ """
+ Load/queue a resource/video to the player (alias for :meth:`.play`).
+ """
+ return self.play(resource)
+
+ @action
+ def mute(self):
+ """ Toggle mute state """
+ assert self._player, 'No instance is running'
+ muted = self._player.is_muted()
+ if muted:
+ self._player.unmute()
+ else:
+ self._player.mute()
+
+ return {'muted': self._player.is_muted()}
+
+ @action
+ def set_position(self, position: float) -> dict:
+ """
+ Seek backward/forward to the specified absolute position.
+
+ :param position: Stream position in seconds.
+ :return: Player state.
+ """
+ assert self._player, 'No instance is running'
+ self._player.seek(position)
+ return self.status()
+
+ @action
+ def status(self) -> dict:
+ """
+ Get the current player state.
+ """
+ if not self._player:
+ return {'state': PlayerState.STOP.value}
+
+ pos = self._player.get_position()
+ length = self._player.get_duration()
+
+ return {
+ 'duration': length,
+ 'filename': self._resource[7:] if self._resource.startswith('file://') else self._resource,
+ 'mute': self._player.is_muted(),
+ 'name': self._resource,
+ 'pause': self._player.is_paused(),
+ 'percent_pos': pos / length if pos is not None and length is not None and pos >= 0 and length > 0 else 0,
+ 'position': pos,
+ 'seekable': length is not None and length > 0,
+ 'state': self._gst_to_player_state(self._player.get_state()).value,
+ 'url': self._resource,
+ 'volume': self._player.get_volume() * 100,
+ }
+
+ @staticmethod
+ def _gst_to_player_state(state) -> PlayerState:
+ # noinspection PyUnresolvedReferences,PyPackageRequirements
+ from gi.repository import Gst
+ if state == Gst.State.READY:
+ return PlayerState.STOP
+ if state == Gst.State.PAUSED:
+ return PlayerState.PAUSE
+ if state == Gst.State.PLAYING:
+ return PlayerState.PLAY
+ return PlayerState.IDLE
+
+
+# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/media/gstreamer/model.py b/platypush/plugins/media/gstreamer/model.py
new file mode 100644
index 0000000000..9c1341e6c6
--- /dev/null
+++ b/platypush/plugins/media/gstreamer/model.py
@@ -0,0 +1,55 @@
+from typing import Type
+
+from platypush.common.gstreamer import Pipeline
+from platypush.context import get_bus
+from platypush.message.event.media import MediaEvent, MediaPlayEvent, MediaPauseEvent, MediaStopEvent, \
+ NewPlayingMediaEvent, MediaMuteChangedEvent, MediaSeekEvent
+
+
+class MediaPipeline(Pipeline):
+ def __init__(self, resource: str):
+ super().__init__()
+ self.resource = resource
+ self.add_source('playbin', uri=resource)
+
+ @staticmethod
+ def post_event(evt_class: Type[MediaEvent], **kwargs):
+ kwargs['player'] = 'local'
+ kwargs['plugin'] = 'media.gstreamer'
+ evt = evt_class(**kwargs)
+ get_bus().post(evt)
+
+ def play(self):
+ # noinspection PyUnresolvedReferences,PyPackageRequirements
+ from gi.repository import Gst
+ is_first_play = self.get_state() == Gst.State.NULL
+
+ super().play()
+ if is_first_play:
+ self.post_event(NewPlayingMediaEvent, resource=self.resource)
+ self.post_event(MediaPlayEvent, resource=self.resource)
+
+ def pause(self):
+ # noinspection PyUnresolvedReferences,PyPackageRequirements
+ from gi.repository import Gst
+ super().pause()
+ self.post_event(MediaPauseEvent if self.get_state() == Gst.State.PAUSED else MediaPlayEvent)
+
+ def stop(self):
+ super().stop()
+ self.post_event(MediaStopEvent)
+
+ def mute(self):
+ super().mute()
+ self.post_event(MediaMuteChangedEvent, mute=self.is_muted())
+
+ def unmute(self):
+ super().unmute()
+ self.post_event(MediaMuteChangedEvent, mute=self.is_muted())
+
+ def seek(self, position: float):
+ super().seek(position)
+ self.post_event(MediaSeekEvent, position=self.get_position())
+
+
+# vim:sw=4:ts=4:et: