Implemented timestamp mark on messages to trigger message expiration logic in case something stays on the bus for longer than a minute

This commit is contained in:
Fabio Manganiello 2018-10-08 10:35:56 +00:00
parent c10e882dd0
commit 948f3dc37d
6 changed files with 36 additions and 8 deletions

View file

@ -12,7 +12,6 @@ import traceback
from threading import Thread from threading import Thread
from .bus import Bus
from .bus.redis import RedisBus from .bus.redis import RedisBus
from .config import Config from .config import Config
from .context import register_backends from .context import register_backends

View file

@ -1,5 +1,6 @@
import logging import logging
import threading import threading
import time
from queue import Queue from queue import Queue
@ -12,6 +13,9 @@ logger = logging.getLogger(__name__)
class Bus(object): class Bus(object):
""" Main local bus where the daemon will listen for new messages """ """ Main local bus where the daemon will listen for new messages """
_MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired
# after one minute without being picked up
def __init__(self, on_message=None): def __init__(self, on_message=None):
self.bus = Queue() self.bus = Queue()
self.on_message = on_message self.on_message = on_message
@ -45,6 +49,11 @@ class Bus(object):
stop=False stop=False
while not stop: while not stop:
msg = self.get() msg = self.get()
if msg.timestamp and time.time() - msg.timestamp > self._MSG_EXPIRY_TIMEOUT:
logger.info('{} seconds old message on the bus expired, ignoring it: {}'
.format(int(time.time()-msg.timestamp), msg))
continue
self.on_message(msg) self.on_message(msg)
if isinstance(msg, StopEvent) and msg.targets_me(): if isinstance(msg, StopEvent) and msg.targets_me():

View file

@ -1,6 +1,7 @@
import logging import logging
import inspect import inspect
import json import json
import time
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,7 +18,7 @@ class Message(object):
return json.dumps({ return json.dumps({
attr: getattr(self, attr) attr: getattr(self, attr)
for attr in self.__dir__() for attr in self.__dir__()
if not attr.startswith('_') if (attr != '_timestamp' or not attr.startswith('_'))
and not inspect.ismethod(getattr(self, attr)) and not inspect.ismethod(getattr(self, attr))
}).replace('\n', ' ') }).replace('\n', ' ')
@ -48,6 +49,10 @@ class Message(object):
logger.warning('Invalid JSON message: {}'.format(msg)) logger.warning('Invalid JSON message: {}'.format(msg))
assert isinstance(msg, dict) assert isinstance(msg, dict)
if not '_timestamp' in msg:
msg['_timestamp'] = time.time()
return msg return msg
@classmethod @classmethod

View file

@ -3,6 +3,7 @@ import json
import random import random
import re import re
import threading import threading
import time
from datetime import date from datetime import date
@ -13,7 +14,7 @@ from platypush.utils import get_event_class_by_type
class Event(Message): class Event(Message):
""" Event message class """ """ Event message class """
def __init__(self, target=None, origin=None, id=None, **kwargs): def __init__(self, target=None, origin=None, id=None, timestamp=None, **kwargs):
""" """
Params: Params:
target -- Target node [String] target -- Target node [String]
@ -28,6 +29,7 @@ class Event(Message):
self.type = '{}.{}'.format(self.__class__.__module__, self.type = '{}.{}'.format(self.__class__.__module__,
self.__class__.__name__) self.__class__.__name__)
self.args = kwargs self.args = kwargs
self.timestamp = timestamp or time.time()
@classmethod @classmethod
def build(cls, msg): def build(cls, msg):
@ -42,6 +44,7 @@ class Event(Message):
args['id'] = msg['id'] if 'id' in msg else cls._generate_id() args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
args['target'] = msg['target'] if 'target' in msg else Config.get('device_id') args['target'] = msg['target'] if 'target' in msg else Config.get('device_id')
args['origin'] = msg['origin'] if 'origin' in msg else Config.get('device_id') args['origin'] = msg['origin'] if 'origin' in msg else Config.get('device_id')
args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time()
return event_class(**args) return event_class(**args)
@ -168,6 +171,7 @@ class Event(Message):
'target' : self.target, 'target' : self.target,
'origin' : self.origin if hasattr(self, 'origin') else None, 'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None, 'id' : self.id if hasattr(self, 'id') else None,
'_timestamp' : self.timestamp,
'args' : { 'args' : {
'type' : self.type, 'type' : self.type,
**args **args

View file

@ -4,6 +4,7 @@ import json
import logging import logging
import random import random
import re import re
import time
import traceback import traceback
from threading import Thread from threading import Thread
@ -21,16 +22,17 @@ class Request(Message):
""" Request message class """ """ Request message class """
def __init__(self, target, action, origin=None, id=None, backend=None, def __init__(self, target, action, origin=None, id=None, backend=None,
args=None, token=None): args=None, token=None, timestamp=None):
""" """
Params: Params:
target -- Target node [String] target -- Target node [Str]
action -- Action to be executed (e.g. music.mpd.play) [String] action -- Action to be executed (e.g. music.mpd.play) [Str]
origin -- Origin node [String] origin -- Origin node [Str]
id -- Message ID, or None to get it auto-generated id -- Message ID, or None to get it auto-generated
backend -- Backend connected to the request, where the response will be delivered backend -- Backend connected to the request, where the response will be delivered
args -- Additional arguments for the action [Dict] args -- Additional arguments for the action [Dict]
token -- Authorization token, if required on the server [Str] token -- Authorization token, if required on the server [Str]
timestamp -- Message creation timestamp [Float]
""" """
self.id = id if id else self._generate_id() self.id = id if id else self._generate_id()
@ -40,6 +42,7 @@ class Request(Message):
self.args = args if args else {} self.args = args if args else {}
self.backend = backend self.backend = backend
self.token = token self.token = token
self.timestamp = timestamp or time.time()
@classmethod @classmethod
def build(cls, msg): def build(cls, msg):
@ -51,6 +54,7 @@ class Request(Message):
} }
args['id'] = msg['id'] if 'id' in msg else cls._generate_id() args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time()
if 'origin' in msg: args['origin'] = msg['origin'] if 'origin' in msg: args['origin'] = msg['origin']
if 'token' in msg: args['token'] = msg['token'] if 'token' in msg: args['token'] = msg['token']
return cls(**args) return cls(**args)
@ -240,6 +244,7 @@ class Request(Message):
'origin' : self.origin if hasattr(self, 'origin') else None, 'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None, 'id' : self.id if hasattr(self, 'id') else None,
'token' : self.token if hasattr(self, 'token') else None, 'token' : self.token if hasattr(self, 'token') else None,
'_timestamp' : self.timestamp,
}) })

View file

@ -1,11 +1,13 @@
import json import json
import time
from platypush.message import Message from platypush.message import Message
class Response(Message): class Response(Message):
""" Response message class """ """ Response message class """
def __init__(self, target=None, origin=None, id=None, output=None, errors=[]): def __init__(self, target=None, origin=None, id=None, output=None, errors=[],
timestamp=None):
""" """
Params: Params:
target -- Target [String] target -- Target [String]
@ -13,12 +15,14 @@ class Response(Message):
output -- Output [String] output -- Output [String]
errors -- Errors [List of strings or exceptions] errors -- Errors [List of strings or exceptions]
id -- Message ID this response refers to id -- Message ID this response refers to
timestamp -- Message timestamp [Float]
""" """
self.target = target self.target = target
self.output = self._parse_msg(output) self.output = self._parse_msg(output)
self.errors = self._parse_msg(errors) self.errors = self._parse_msg(errors)
self.origin = origin self.origin = origin
self.timestamp = timestamp or time.time()
self.id = id self.id = id
def is_error(self): def is_error(self):
@ -45,6 +49,7 @@ class Response(Message):
'errors' : msg['response']['errors'], 'errors' : msg['response']['errors'],
} }
args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time()
if 'id' in msg: args['id'] = msg['id'] if 'id' in msg: args['id'] = msg['id']
if 'origin' in msg: args['origin'] = msg['origin'] if 'origin' in msg: args['origin'] = msg['origin']
return cls(**args) return cls(**args)
@ -61,6 +66,7 @@ class Response(Message):
'type' : 'response', 'type' : 'response',
'target' : self.target if hasattr(self, 'target') else None, 'target' : self.target if hasattr(self, 'target') else None,
'origin' : self.origin if hasattr(self, 'origin') else None, 'origin' : self.origin if hasattr(self, 'origin') else None,
'_timestamp' : self.timestamp,
'response' : { 'response' : {
'output' : self.output, 'output' : self.output,
'errors' : self.errors, 'errors' : self.errors,