forked from platypush/platypush
Messages can be quite big and verbose, and they can anyway be subscribed over Websockets. Full dumps are anyway enabled when Platypush is started in verbose mode. This commit replaces the dumps on INFO level with a quick summary containing the message ID, request/event type and response time.
214 lines
5.9 KiB
214 lines
5.9 KiB
from abc import ABC, abstractmethod
from dataclasses import asdict, is_dataclass
import decimal
import datetime
from enum import Enum
import io
import logging
import inspect
import json
import time
from typing import Optional, Union
from uuid import UUID
_logger = logging.getLogger('platypush')
class JSONAble(ABC):
Generic interface for JSON-able objects.
def to_json(self) -> Union[str, list, dict]:
raise NotImplementedError()
class Message:
Message generic class
class Encoder(json.JSONEncoder):
JSON encoder that can serialize custom types commonly handled in
Platypush messages.
def parse_numpy(obj):
import numpy as np
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, decimal.Decimal):
return float(obj)
if isinstance(obj, (bytes, bytearray)):
return '0x' + ''.join([f'{x:02x}' for x in obj])
if callable(obj):
return f'<function at {obj.__module__}.{obj.__name__}>'
except (AttributeError, ImportError, TypeError):
def parse_datetime(obj):
if isinstance(obj, (datetime.datetime,, datetime.time)):
return obj.isoformat()
def default(self, o):
from platypush.procedure import Procedure
value = self.parse_datetime(o)
if value is not None:
return value
if isinstance(o, set):
return list(o)
if isinstance(o, UUID):
return str(o)
value = self.parse_numpy(o)
if value is not None:
return value
if isinstance(o, JSONAble):
return o.to_json()
if isinstance(o, Procedure):
return o.to_dict()
if isinstance(o, Enum):
return o.value
if isinstance(o, Exception):
return f'<{o.__class__.__name__}>' + (f' {o}' if o else '')
if is_dataclass(o):
return asdict(o)
if isinstance(o, Message):
return o.to_dict(o)
# Don't serialize I/O wrappers/objects
if isinstance(o, io.IOBase):
return None
return super().default(o)
except Exception as e:
'Could not serialize object type %s: %s: %s', type(o), e, o
def __init__(self, *_, timestamp=None, logging_level=logging.INFO, **__):
self.timestamp = timestamp or time.time()
self.logging_level = logging_level
self._logger = _logger
self._default_log_prefix = ''
def log(self, level: Optional[int] = None, prefix=''):
if self.logging_level is None:
return # Skip logging
log_func =
level = level if level is not None else self.logging_level
if level == logging.DEBUG:
log_func = self._logger.debug
elif level == logging.WARNING:
log_func = self._logger.warning
elif level == logging.ERROR:
log_func = self._logger.error
elif level == logging.FATAL:
log_func = self._logger.fatal
if not prefix:
prefix = self._default_log_prefix
log_func('%s%s', prefix, self)
def __str__(self):
Overrides the str() operator and converts
the message into a UTF-8 JSON string
return json.dumps(
attr: getattr(self, attr)
for attr in self.__dir__()
if (attr != '_timestamp' or not attr.startswith('_'))
and not inspect.ismethod(getattr(self, attr))
).replace('\n', ' ')
def __bytes__(self):
Overrides the bytes() operator, converts the message into
its JSON-serialized UTF-8-encoded representation
return str(self).encode('utf-8')
def parse(cls, msg):
Parse a generic message into a key-value dictionary
:param msg: Original message. It can be a dictionary, a Message,
or a string/bytearray, as long as it's valid UTF-8 JSON
if isinstance(msg, cls):
msg = str(msg)
if isinstance(msg, (bytes, bytearray)):
msg = msg.decode('utf-8')
if isinstance(msg, str):
msg = json.loads(msg.strip())
except (ValueError, TypeError):
_logger.warning('Invalid JSON message: %s', msg)
assert isinstance(msg, dict)
if '_timestamp' not in msg:
msg['_timestamp'] = time.time()
return msg
def to_dict(cls, msg):
Converts a Message object into a dictionary
:param msg: Message object
return {
k: v
for k, v in cls.parse(msg).items()
if k not in ('id', 'token', 'target', 'origin', '_timestamp')
def build(cls, msg):
Builds a Message object from a dictionary.
:param msg: The message as a key-value dictionary, Message object or JSON string
from platypush.utils import get_message_class_by_type
msg = cls.parse(msg)
msgtype = get_message_class_by_type(msg['type'])
if msgtype != cls:
# vim:sw=4:ts=4:et: