From 948f3dc37d66834dd697fbee456b689290d83b3c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 8 Oct 2018 10:35:56 +0000 Subject: [PATCH] Implemented timestamp mark on messages to trigger message expiration logic in case something stays on the bus for longer than a minute --- platypush/__init__.py | 1 - platypush/bus/__init__.py | 9 +++++++++ platypush/message/__init__.py | 7 ++++++- platypush/message/event/__init__.py | 6 +++++- platypush/message/request/__init__.py | 13 +++++++++---- platypush/message/response/__init__.py | 8 +++++++- 6 files changed, 36 insertions(+), 8 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index 727d14e2e1..3bbd0b4355 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -12,7 +12,6 @@ import traceback from threading import Thread -from .bus import Bus from .bus.redis import RedisBus from .config import Config from .context import register_backends diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 47e1e5ded7..d5489e620a 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -1,5 +1,6 @@ import logging import threading +import time from queue import Queue @@ -12,6 +13,9 @@ logger = logging.getLogger(__name__) class Bus(object): """ Main local bus where the daemon will listen for new messages """ + _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired + # after one minute without being picked up + def __init__(self, on_message=None): self.bus = Queue() self.on_message = on_message @@ -45,6 +49,11 @@ class Bus(object): stop=False while not stop: msg = self.get() + if msg.timestamp and time.time() - msg.timestamp > self._MSG_EXPIRY_TIMEOUT: + logger.info('{} seconds old message on the bus expired, ignoring it: {}' + .format(int(time.time()-msg.timestamp), msg)) + continue + self.on_message(msg) if isinstance(msg, StopEvent) and msg.targets_me(): diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index e48145e3e3..122f03eb8c 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -1,6 +1,7 @@ import logging import inspect import json +import time logger = logging.getLogger(__name__) @@ -17,7 +18,7 @@ class Message(object): return json.dumps({ attr: getattr(self, attr) for attr in self.__dir__() - if not attr.startswith('_') + if (attr != '_timestamp' or not attr.startswith('_')) and not inspect.ismethod(getattr(self, attr)) }).replace('\n', ' ') @@ -48,6 +49,10 @@ class Message(object): logger.warning('Invalid JSON message: {}'.format(msg)) assert isinstance(msg, dict) + + if not '_timestamp' in msg: + msg['_timestamp'] = time.time() + return msg @classmethod diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 561ab2e849..2a7f7df714 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -3,6 +3,7 @@ import json import random import re import threading +import time from datetime import date @@ -13,7 +14,7 @@ from platypush.utils import get_event_class_by_type class Event(Message): """ Event message class """ - def __init__(self, target=None, origin=None, id=None, **kwargs): + def __init__(self, target=None, origin=None, id=None, timestamp=None, **kwargs): """ Params: target -- Target node [String] @@ -28,6 +29,7 @@ class Event(Message): self.type = '{}.{}'.format(self.__class__.__module__, self.__class__.__name__) self.args = kwargs + self.timestamp = timestamp or time.time() @classmethod def build(cls, msg): @@ -42,6 +44,7 @@ class Event(Message): args['id'] = msg['id'] if 'id' in msg else cls._generate_id() args['target'] = msg['target'] if 'target' in msg else Config.get('device_id') args['origin'] = msg['origin'] if 'origin' in msg else Config.get('device_id') + args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time() return event_class(**args) @@ -168,6 +171,7 @@ class Event(Message): 'target' : self.target, 'origin' : self.origin if hasattr(self, 'origin') else None, 'id' : self.id if hasattr(self, 'id') else None, + '_timestamp' : self.timestamp, 'args' : { 'type' : self.type, **args diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 877cd33d72..63c250b14a 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -4,6 +4,7 @@ import json import logging import random import re +import time import traceback from threading import Thread @@ -21,16 +22,17 @@ class Request(Message): """ Request message class """ def __init__(self, target, action, origin=None, id=None, backend=None, - args=None, token=None): + args=None, token=None, timestamp=None): """ Params: - target -- Target node [String] - action -- Action to be executed (e.g. music.mpd.play) [String] - origin -- Origin node [String] + target -- Target node [Str] + action -- Action to be executed (e.g. music.mpd.play) [Str] + origin -- Origin node [Str] id -- Message ID, or None to get it auto-generated backend -- Backend connected to the request, where the response will be delivered args -- Additional arguments for the action [Dict] token -- Authorization token, if required on the server [Str] + timestamp -- Message creation timestamp [Float] """ self.id = id if id else self._generate_id() @@ -40,6 +42,7 @@ class Request(Message): self.args = args if args else {} self.backend = backend self.token = token + self.timestamp = timestamp or time.time() @classmethod def build(cls, msg): @@ -51,6 +54,7 @@ class Request(Message): } args['id'] = msg['id'] if 'id' in msg else cls._generate_id() + args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time() if 'origin' in msg: args['origin'] = msg['origin'] if 'token' in msg: args['token'] = msg['token'] return cls(**args) @@ -240,6 +244,7 @@ class Request(Message): 'origin' : self.origin if hasattr(self, 'origin') else None, 'id' : self.id if hasattr(self, 'id') else None, 'token' : self.token if hasattr(self, 'token') else None, + '_timestamp' : self.timestamp, }) diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index ed499b1884..b7e03ae84f 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -1,11 +1,13 @@ import json +import time from platypush.message import Message class Response(Message): """ Response message class """ - def __init__(self, target=None, origin=None, id=None, output=None, errors=[]): + def __init__(self, target=None, origin=None, id=None, output=None, errors=[], + timestamp=None): """ Params: target -- Target [String] @@ -13,12 +15,14 @@ class Response(Message): output -- Output [String] errors -- Errors [List of strings or exceptions] id -- Message ID this response refers to + timestamp -- Message timestamp [Float] """ self.target = target self.output = self._parse_msg(output) self.errors = self._parse_msg(errors) self.origin = origin + self.timestamp = timestamp or time.time() self.id = id def is_error(self): @@ -45,6 +49,7 @@ class Response(Message): 'errors' : msg['response']['errors'], } + args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time() if 'id' in msg: args['id'] = msg['id'] if 'origin' in msg: args['origin'] = msg['origin'] return cls(**args) @@ -61,6 +66,7 @@ class Response(Message): 'type' : 'response', 'target' : self.target if hasattr(self, 'target') else None, 'origin' : self.origin if hasattr(self, 'origin') else None, + '_timestamp' : self.timestamp, 'response' : { 'output' : self.output, 'errors' : self.errors,