diff --git a/platypush/plugins/camera/__init__.py b/platypush/plugins/camera/__init__.py
index e12f19ac9..f41e2b895 100644
--- a/platypush/plugins/camera/__init__.py
+++ b/platypush/plugins/camera/__init__.py
@@ -31,7 +31,7 @@ from platypush.plugins.camera.model.writer.preview import (
     PreviewWriter,
     PreviewWriterFactory,
 )
-from platypush.utils import get_plugin_name_by_class
+from platypush.utils import get_plugin_name_by_class, wait_for_either
 
 __all__ = [
     'Camera',
@@ -211,14 +211,13 @@ class CameraPlugin(RunnablePlugin, ABC):
         else:
             camera = self._camera_class(info=info)
 
+        ctx = ctx or {}
+        ctx['stream'] = stream
         camera.info.set(**params)
-        camera.object = self.prepare_device(camera, **(ctx or {}))
+        camera.object = self.prepare_device(camera, **ctx)
 
         if stream and camera.info.stream_format:
-            writer_class = StreamWriter.get_class_by_name(camera.info.stream_format)
-            camera.stream = writer_class(
-                camera=camera, plugin=self, redis_queue=redis_queue
-            )
+            self._prepare_stream_writer(camera, redis_queue=redis_queue)
 
         if camera.info.frames_dir:
             pathlib.Path(
@@ -228,6 +227,13 @@ class CameraPlugin(RunnablePlugin, ABC):
         self._devices[device] = camera
         return camera
 
+    def _prepare_stream_writer(self, camera: Camera, redis_queue: Optional[str] = None):
+        assert camera.info.stream_format, 'No stream format specified'
+        writer_class = StreamWriter.get_class_by_name(camera.info.stream_format)
+        camera.stream = writer_class(
+            camera=camera, plugin=self, redis_queue=redis_queue
+        )
+
     def close_device(self, camera: Camera, wait_capture: bool = True) -> None:
         """
         Close and release a device.
@@ -688,18 +694,19 @@ class CameraPlugin(RunnablePlugin, ABC):
         return self.status(camera.info.device)  # type: ignore
 
     @staticmethod
-    def _prepare_server_socket(camera: Camera) -> socket.socket:
-        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        server_socket.bind(
-            (  # lgtm [py/bind-socket-all-network-interfaces]
-                camera.info.bind_address or '0.0.0.0',
-                camera.info.listen_port,
+    @contextmanager
+    def _prepare_server_socket(camera: Camera) -> Generator[socket.socket, None, None]:
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv_sock:
+            srv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            srv_sock.bind(
+                (  # lgtm [py/bind-socket-all-network-interfaces]
+                    camera.info.bind_address or '0.0.0.0',
+                    camera.info.listen_port,
+                )
             )
-        )
-        server_socket.listen(1)
-        server_socket.settimeout(1)
-        return server_socket
+            srv_sock.listen(1)
+            srv_sock.settimeout(1)
+            yield srv_sock
 
     def _accept_client(self, server_socket: socket.socket) -> Optional[IO]:
         try:
@@ -707,42 +714,62 @@ class CameraPlugin(RunnablePlugin, ABC):
             self.logger.info('Accepted client connection from %s', sock.getpeername())
             return sock.makefile('wb')
         except socket.timeout:
-            return
+            return None
 
     def streaming_thread(
         self, camera: Camera, stream_format: str, duration: Optional[float] = None
     ):
-        streaming_started_time = time.time()
-        server_socket = self._prepare_server_socket(camera)
-        sock = None
-        self.logger.info('Starting streaming on port %s', camera.info.listen_port)
+        with self._prepare_server_socket(camera) as srv_sock:
+            streaming_started_time = time.time()
+            sock = None
+            self.logger.info('Starting streaming on port %s', camera.info.listen_port)
 
-        try:
-            while camera.stream_event.is_set():
-                if duration and time.time() - streaming_started_time >= duration:
-                    break
+            try:
+                while camera.stream_event.is_set():
+                    if duration and time.time() - streaming_started_time >= duration:
+                        break
 
-                sock = self._accept_client(server_socket)
-                if not sock:
-                    continue
+                    sock = self._accept_client(srv_sock)
+                    if not sock:
+                        continue
 
-                if camera.info.device not in self._devices:
-                    info = asdict(camera.info)
-                    info['stream_format'] = stream_format
-                    camera = self.open_device(stream=True, **info)
+                    if duration and time.time() - streaming_started_time >= duration:
+                        break
 
-                assert camera.stream, 'No camera stream available'
-                camera.stream.sock = sock
-                self.start_camera(
-                    camera, duration=duration, frames_dir=None, image_file=None
-                )
-        finally:
-            self._cleanup_stream(camera, server_socket, sock)
-            self.logger.info('Stopped camera stream')
+                    self._streaming_loop(
+                        camera, stream_format, sock=sock, duration=duration
+                    )
+
+                    wait_for_either(
+                        self._should_stop, camera.stop_stream_event, timeout=duration
+                    )
+            finally:
+                self._cleanup_stream(camera, srv_sock, sock)
+
+        self.logger.info('Stopped camera stream')
+
+    def _streaming_loop(
+        self,
+        camera: Camera,
+        stream_format: str,
+        sock: IO,
+        duration: Optional[float] = None,
+    ):
+        if camera.info.device not in self._devices:
+            info = asdict(camera.info)
+            info['stream_format'] = stream_format
+            camera = self.open_device(stream=True, **info)
+
+        assert camera.stream, 'No camera stream available'
+        camera.stream.sock = sock
+        self.start_camera(camera, duration=duration, frames_dir=None, image_file=None)
 
     def _cleanup_stream(
         self, camera: Camera, server_socket: socket.socket, client: Optional[IO]
     ):
+        camera.stream_event.clear()
+        camera.stop_stream_event.set()
+
         if client:
             try:
                 client.close()
@@ -793,6 +820,7 @@ class CameraPlugin(RunnablePlugin, ABC):
 
         self._streams[camera.info.device] = camera
         camera.stream_event.set()
+        camera.stop_stream_event.clear()
 
         camera.stream_thread = threading.Thread(
             target=self.streaming_thread,
@@ -822,6 +850,8 @@ class CameraPlugin(RunnablePlugin, ABC):
 
     def _stop_streaming(self, camera: Camera):
         camera.stream_event.clear()
+        camera.stop_stream_event.set()
+
         if camera.stream_thread and camera.stream_thread.is_alive():
             camera.stream_thread.join(timeout=5.0)
 
diff --git a/platypush/plugins/camera/model/camera.py b/platypush/plugins/camera/model/camera.py
index 851000213..baab2421b 100644
--- a/platypush/plugins/camera/model/camera.py
+++ b/platypush/plugins/camera/model/camera.py
@@ -50,6 +50,7 @@ class Camera:
     info: CameraInfo
     start_event: threading.Event = threading.Event()
     stream_event: threading.Event = threading.Event()
+    stop_stream_event: threading.Event = threading.Event()
     capture_thread: Optional[threading.Thread] = None
     stream_thread: Optional[threading.Thread] = None
     object = None
diff --git a/platypush/plugins/camera/model/writer/__init__.py b/platypush/plugins/camera/model/writer/__init__.py
index 5bca53bf3..6744f6358 100644
--- a/platypush/plugins/camera/model/writer/__init__.py
+++ b/platypush/plugins/camera/model/writer/__init__.py
@@ -17,11 +17,9 @@ class VideoWriter(ABC):
     mimetype: Optional[str] = None
 
     def __init__(self, *_, **kwargs):
-        from platypush.plugins.camera import Camera, CameraPlugin
-
         self.logger = logging.getLogger(self.__class__.__name__)
-        self.camera: Camera = kwargs.pop('camera')
-        self.plugin: CameraPlugin = kwargs.pop('plugin')
+        self.camera = kwargs.get('camera', getattr(self, 'camera', None))
+        self.plugin = kwargs.get('plugin', getattr(self, 'plugin', None))
         self.closed = False
 
     @abstractmethod
diff --git a/platypush/plugins/camera/model/writer/ffmpeg.py b/platypush/plugins/camera/model/writer/ffmpeg.py
index d31f9efce..f239cddd3 100644
--- a/platypush/plugins/camera/model/writer/ffmpeg.py
+++ b/platypush/plugins/camera/model/writer/ffmpeg.py
@@ -203,10 +203,10 @@ class MKVStreamWriter(FFmpegStreamWriter):
 class H264StreamWriter(FFmpegStreamWriter):
     mimetype = 'video/h264'
 
-    def __init__(self, camera: Camera, *args, **kwargs):
+    def __init__(self, *args, camera: Camera, **kwargs):
         if not camera.info.output_codec:
             camera.info.output_codec = 'libxvid'
-        super().__init__(camera, *args, output_format='h264', **kwargs)
+        super().__init__(*args, camera=camera, output_format='h264', **kwargs)
 
 
 class H265StreamWriter(FFmpegStreamWriter):
diff --git a/platypush/plugins/camera/pi/__init__.py b/platypush/plugins/camera/pi/__init__.py
index 1b8b261ec..69c504c2c 100644
--- a/platypush/plugins/camera/pi/__init__.py
+++ b/platypush/plugins/camera/pi/__init__.py
@@ -1,7 +1,6 @@
 import os
 import time
-
-from typing import Optional, Union
+from typing import IO, Optional, Union
 
 from platypush.plugins import action
 from platypush.plugins.camera import CameraPlugin, Camera
@@ -90,17 +89,60 @@ class CameraPiPlugin(CameraPlugin):
         self.camera_info.exposure_compensation = exposure_compensation  # type: ignore
         self.camera_info.awb_mode = awb_mode  # type: ignore
 
+    def _get_transform(self, device: Camera):
+        from libcamera import Orientation, Transform  # type: ignore
+        from picamera2.utils import orientation_to_transform  # type: ignore
+
+        rot = device.info.rotate
+        if not rot:
+            return Transform(
+                # It may seem counterintuitive, but the picamera2 library's flip
+                # definition is the opposite of ours
+                hflip=device.info.vertical_flip,
+                vflip=device.info.horizontal_flip,
+            )
+
+        if rot == 90:
+            orient = (
+                Orientation.Rotate90Mirror
+                if device.info.vertical_flip
+                else Orientation.Rotate90
+            )
+        elif rot == 180:
+            orient = (
+                Orientation.Rotate180Mirror
+                if device.info.horizontal_flip
+                else Orientation.Rotate180
+            )
+        elif rot == 270:
+            orient = (
+                Orientation.Rotate270Mirror
+                if device.info.vertical_flip
+                else Orientation.Rotate270
+            )
+        else:
+            raise AssertionError(
+                f'Invalid rotation: {rot}. Supported values: 0, 90, 180, 270'
+            )
+
+        return orientation_to_transform(orient)
+
     def prepare_device(
-        self, device: Camera, start: bool = True, video: bool = False, **_
+        self,
+        device: Camera,
+        start: bool = True,
+        video: bool = False,
+        stream: bool = False,
+        **_,
     ):
-        from libcamera import Transform  # type: ignore
         from picamera2 import Picamera2  # type: ignore
 
         assert isinstance(device, PiCamera), f'Invalid device type: {type(device)}'
         camera = Picamera2(camera_num=device.info.device)
+        still = not (video or stream)
         cfg_params = {
             'main': {
-                'format': 'XBGR8888' if video else 'BGR888',
+                'format': 'XBGR8888' if not still else 'BGR888',
                 **(
                     {'size': tuple(map(int, device.info.resolution))}
                     if device.info.resolution
@@ -108,15 +150,8 @@ class CameraPiPlugin(CameraPlugin):
                 ),
             },
             **(
-                {
-                    'transform': Transform(
-                        # It may seem counterintuitive, but the picamera2 library's flip
-                        # definition is the opposite of ours
-                        hflip=device.info.vertical_flip,
-                        vflip=device.info.horizontal_flip,
-                    ),
-                }
-                if video
+                {'transform': self._get_transform(device)}
+                if not still
                 # We don't need to flip the image for individual frames, the base camera
                 # class methods will take care of that
                 else {}
@@ -131,7 +166,7 @@ class CameraPiPlugin(CameraPlugin):
 
         cfg = (
             camera.create_video_configuration
-            if video
+            if not still
             else camera.create_still_configuration
         )(**cfg_params)
 
@@ -151,6 +186,15 @@ class CameraPiPlugin(CameraPlugin):
         assert device.object, 'Camera not open'
         return device.object.capture_image('main')
 
+    @property
+    def _video_encoders_by_format(self) -> dict:
+        from picamera2.encoders import H264Encoder, MJPEGEncoder  # type: ignore
+
+        return {
+            'h264': H264Encoder,
+            'mjpeg': MJPEGEncoder,
+        }
+
     @action
     def capture_video(
         self,
@@ -211,12 +255,52 @@ class CameraPiPlugin(CameraPlugin):
 
         return self.status(camera.info.device).output
 
+    def _streaming_loop(self, camera: Camera, stream_format: str, sock: IO, *_, **__):
+        from picamera2 import Picamera2  # type: ignore
+        from picamera2.outputs import FileOutput  # type: ignore
+
+        encoder_cls = self._video_encoders_by_format.get(stream_format)
+        assert (
+            encoder_cls
+        ), f'Invalid stream format: {stream_format}. Supported formats: {", ".join(self._video_encoders_by_format)}'
+        assert isinstance(camera, PiCamera), f'Invalid camera type: {type(camera)}'
+        assert camera.object and isinstance(
+            camera.object, Picamera2
+        ), f'Invalid camera object type: {type(camera.object)}'
+
+        cam = camera.object
+        encoder = encoder_cls()
+        cam.encoders = encoder
+        encoder.output = FileOutput(sock)
+        cam.start_encoder(encoder)
+        cam.start()
+
+    def _prepare_stream_writer(self, *_, **__):
+        """
+        Overrides the base method to do nothing - the stream writer is handled
+        by the picamera2 library.
+        """
+
+    def _cleanup_stream(self, camera: Camera, *_, **__):
+        cam = camera.object
+        if not cam:
+            return
+
+        cam.stop()
+        cam.stop_encoder()
+        cam.close()
+
     @action
     def start_streaming(
-        self, duration: Optional[float] = None, stream_format: str = 'h264', **camera
+        self,
+        device: Optional[Union[int, str]] = None,
+        duration: Optional[float] = None,
+        stream_format: str = 'h264',
+        **camera,
     ) -> dict:
-        camera = self.open_device(stream_format=stream_format, **camera)
-        return self._start_streaming(camera, duration, stream_format)  # type: ignore
+        return super().start_streaming(  # type: ignore
+            device=device, duration=duration, stream_format=stream_format, **camera
+        )
 
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py
index eaeb40573..6cd4c4845 100644
--- a/platypush/utils/__init__.py
+++ b/platypush/utils/__init__.py
@@ -18,8 +18,8 @@ from importlib.machinery import SourceFileLoader
 from importlib.util import spec_from_loader, module_from_spec
 from multiprocessing import Lock as PLock
 from tempfile import gettempdir
-from threading import Lock as TLock
-from typing import Generator, Optional, Tuple, Type, Union
+from threading import Event, Lock as TLock
+from typing import Callable, Generator, Optional, Tuple, Type, Union
 
 from dateutil import parser, tz
 from redis import Redis
@@ -780,4 +780,17 @@ def get_default_downloads_dir() -> str:
     return os.path.join(os.path.expanduser('~'), 'Downloads')
 
 
+def wait_for_either(*events, timeout: Optional[float] = None, cls: Type = Event):
+    """
+    Wait for any of the given events to be set.
+
+    :param events: The events to be checked.
+    :param timeout: The maximum time to wait for the event to be set. Default: None.
+    :param cls: The class to be used for the event. Default: threading.Event.
+    """
+    from .threads import OrEvent
+
+    return OrEvent(*events, cls=cls).wait(timeout=timeout)
+
+
 # vim:sw=4:ts=4:et:
diff --git a/platypush/utils/threads.py b/platypush/utils/threads.py
new file mode 100644
index 000000000..df64a3b96
--- /dev/null
+++ b/platypush/utils/threads.py
@@ -0,0 +1,58 @@
+import threading
+from typing import Callable, Optional, Type
+
+
+def OrEvent(*events, cls: Type = threading.Event):
+    """
+    Wrapper for threading.Event that allows to create an event that is
+    set if any of the given events are set.
+
+    Adapted from
+    https://stackoverflow.com/questions/12317940/python-threading-can-i-sleep-on-two-threading-events-simultaneously#12320352.
+
+    :param events: The events to be checked.
+    :param cls: The class to be used for the event. Default: threading.Event.
+    """
+
+    or_event = cls()
+
+    def changed():
+        bools = [e.is_set() for e in events]
+        if any(bools):
+            or_event.set()
+        else:
+            or_event.clear()
+
+    def _to_or(e, changed_callback: Callable[[], None]):
+        e._set = e.set
+        e._clear = e.clear
+        e.changed = changed_callback
+        e.set = lambda: _or_set(e)
+        e.clear = lambda: _clear_or(e)
+
+    def _clear_or(e):
+        e._clear()
+        e.changed()
+
+    def _or_set(e):
+        e._set()
+        e.changed()
+
+    for e in events:
+        _to_or(e, changed)
+
+    changed()
+    return or_event
+
+
+def wait_for_either(
+    *events, timeout: Optional[float] = None, cls: Type = threading.Event
+):
+    """
+    Wait for any of the given events to be set.
+
+    :param events: The events to be checked.
+    :param timeout: The maximum time to wait for the event to be set. Default: None.
+    :param cls: The class to be used for the event. Default: threading.Event.
+    """
+    return OrEvent(*events, cls=cls).wait(timeout=timeout)