platypush/platypush/backend/http/app/mixins/__init__.py

159 lines
4.9 KiB
Python

from contextlib import contextmanager
from dataclasses import dataclass
import json
import logging
from multiprocessing import RLock
from typing import Generator, Iterable, Optional, Set, Union
from redis import ConnectionError as RedisConnectionError
from redis.client import PubSub
from platypush.config import Config
from platypush.message import Message as AppMessage
from platypush.utils import get_redis
logger = logging.getLogger(__name__)
MessageType = Union[AppMessage, bytes, str, dict, list, set, tuple]
"""Types of supported messages on Redis/websocket channels."""
@dataclass
class Message:
"""
A wrapper for a message received on a Redis subscription.
"""
data: bytes
"""The data received in the message."""
channel: str
"""The channel the message was received on."""
class PubSubMixin:
"""
A mixin for Tornado route handlers that support pub/sub mechanisms.
"""
def __init__(self, *_, subscriptions: Optional[Iterable[str]] = None, **__):
self._pubsub: Optional[PubSub] = None
"""Pub/sub proxy."""
self._subscriptions: Set[str] = set(subscriptions or [])
"""Set of current channel subscriptions."""
self._pubsub_lock = RLock()
"""
Subscriptions lock. It ensures that the list of subscriptions is
manipulated by one thread or process at the time.
"""
self.subscribe(*self._subscriptions)
@property
@contextmanager
def pubsub(self):
"""
Pub/sub proxy lazy property with context manager.
"""
with self._pubsub_lock:
# Lazy initialization for the pub/sub object.
if self._pubsub is None:
self._pubsub = get_redis().pubsub()
# Yield the pub/sub object (context manager pattern).
yield self._pubsub
with self._pubsub_lock:
# Close and free the pub/sub object if it has no active subscriptions.
if self._pubsub is not None and len(self._subscriptions) == 0:
self._pubsub_close()
@staticmethod
def _serialize(data: MessageType) -> bytes:
"""
Serialize a message as bytes before delivering it to either a Redis or websocket channel.
"""
if isinstance(data, AppMessage):
data = str(data)
if isinstance(data, (list, tuple, set)):
data = list(data)
if isinstance(data, (list, dict)):
data = json.dumps(data, cls=AppMessage.Encoder)
if isinstance(data, str):
data = data.encode('utf-8')
return data
@classmethod
def publish(cls, data: MessageType, *channels: str) -> None:
"""
Publish data on one or more Redis channels.
"""
for channel in channels:
get_redis().publish(channel, cls._serialize(data))
def subscribe(self, *channels: str) -> None:
"""
Subscribe to a set of Redis channels.
"""
with self.pubsub as pubsub:
for channel in channels:
pubsub.subscribe(channel)
self._subscriptions.add(channel)
def unsubscribe(self, *channels: str) -> None:
"""
Unsubscribe from a set of Redis channels.
"""
with self.pubsub as pubsub:
for channel in channels:
if channel in self._subscriptions:
pubsub.unsubscribe(channel)
self._subscriptions.remove(channel)
def listen(self) -> Generator[Message, None, None]:
"""
Listens for pub/sub messages and yields them.
"""
try:
with self.pubsub as pubsub:
pubsub.subscribe(*self._subscriptions)
for msg in pubsub.listen():
channel = msg.get('channel', b'').decode()
if msg.get('type') != 'message' or not (
channel and channel in self._subscriptions
):
continue
yield Message(data=msg.get('data', b''), channel=channel)
except (AttributeError, ValueError, RedisConnectionError):
return
def _pubsub_close(self):
"""
Closes the pub/sub object.
"""
with self._pubsub_lock:
if self._pubsub is not None:
try:
self._pubsub.close()
except Exception as e:
logger.debug('Error on pubsub close: %s', e)
finally:
self._pubsub = None
def on_close(self):
"""
Extensible close handler that closes the pub/sub object.
"""
self._pubsub_close()
@staticmethod
def get_channel(channel: str) -> str:
"""
Utility method that returns the prefixed Redis channel for a certain subscription name.
"""
return f'_platypush/{Config.get("device_id")}/{channel}' # type: ignore
# vim:sw=4:ts=4:et: