platypush/platypush/message/event/__init__.py

218 lines
8.0 KiB
Python

import copy
import hashlib
import json
import random
import re
import time
import uuid
from datetime import date
from platypush.config import Config
from platypush.message import Message
from platypush.utils import get_event_class_by_type
class Event(Message):
""" Event message class """
# If this class property is set to false then the logging of these events
# will be disabled. Logging is usually disabled for events with a very
# high frequency that would otherwise pollute the logs e.g. camera capture
# events
# pylint: disable=redefined-builtin
def __init__(self, target=None, origin=None, id=None, timestamp=None,
disable_logging=False, disable_web_clients_notification=False, **kwargs):
"""
Params:
target -- Target node [String]
origin -- Origin node (default: current node) [String]
id -- Event ID (default: auto-generated)
kwargs -- Additional arguments for the event [kwDict]
"""
super().__init__(timestamp=timestamp)
self.id = id if id else self._generate_id()
self.target = target if target else Config.get('device_id')
self.origin = origin if origin else Config.get('device_id')
self.type = '{}.{}'.format(self.__class__.__module__,
self.__class__.__name__)
self.args = kwargs
self.disable_logging = disable_logging
self.disable_web_clients_notification = disable_web_clients_notification
for arg, value in self.args.items():
if arg not in [
'id', 'args', 'origin', 'target', 'type', 'timestamp', 'disable_logging'
] and not arg.startswith('_'):
self.__setattr__(arg, value)
@classmethod
def build(cls, msg):
""" Builds an event message from a JSON UTF-8 string/bytearray, a
dictionary, or another Event """
msg = super().parse(msg)
event_type = msg['args'].pop('type')
event_class = get_event_class_by_type(event_type)
args = msg['args'] if 'args' in msg else {}
args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
args['target'] = msg['target'] if 'target' in msg else Config.get('device_id')
args['origin'] = msg['origin'] if 'origin' in msg else Config.get('device_id')
args['timestamp'] = msg['_timestamp'] if '_timestamp' in msg else time.time()
return event_class(**args)
@staticmethod
def _generate_id():
""" Generate a unique event ID """
return hashlib.md5(str(uuid.uuid1()).encode()).hexdigest()
def matches_condition(self, condition):
"""
If the event matches an event condition, it will return an EventMatchResult
:param condition: The platypush.event.hook.EventCondition object
"""
result = EventMatchResult(is_match=False, parsed_args=self.args)
match_scores = []
if not isinstance(self, condition.type):
return result
for (attr, value) in condition.args.items():
if attr not in self.args:
return result
if isinstance(self.args[attr], str):
arg_result = self._matches_argument(argname=attr, condition_value=value)
if arg_result.is_match:
match_scores.append(arg_result.score)
for (parsed_arg, parsed_value) in arg_result.parsed_args.items():
result.parsed_args[parsed_arg] = parsed_value
else:
return result
elif self.args[attr] != value:
# TODO proper support for list and dictionary matches
return result
result.is_match = True
if match_scores:
result.score = sum(match_scores) / float(len(match_scores))
return result
def _matches_argument(self, argname, condition_value):
"""
Returns an EventMatchResult if the event argument [argname] matches
[condition_value].
- Example:
- self.args = {
'phrase': 'Hey dude turn on the living room lights'
}
- self._matches_argument(argname='phrase', condition_value='Turn on the ${lights} lights')
will return EventMatchResult(is_match=True, parsed_args={ 'lights': 'living room' })
- self._matches_argument(argname='phrase', condition_value='Turn off the ${lights} lights')
will return EventMatchResult(is_match=False, parsed_args={})
"""
result = EventMatchResult(is_match=False)
event_tokens = re.split(r'\s+', self.args[argname].strip().lower())
condition_tokens = re.split(r'\s+', condition_value.strip().lower())
while event_tokens and condition_tokens:
event_token = event_tokens[0]
condition_token = condition_tokens[0]
if event_token == condition_token:
event_tokens.pop(0)
condition_tokens.pop(0)
result.score += 1.5
elif re.search(condition_token, event_token):
m = re.search('({})'.format(condition_token), event_token)
if m.group(1):
event_tokens.pop(0)
result.score += 1.25
condition_tokens.pop(0)
else:
m = re.match(r'[^\\]*\${(.+?)}', condition_token)
if m:
argname = m.group(1)
if argname not in result.parsed_args:
result.parsed_args[argname] = event_token
result.score += 1.0
else:
result.parsed_args[argname] += ' ' + event_token
if (len(condition_tokens) == 1 and len(event_tokens) == 1) \
or (len(event_tokens) > 1 and len(condition_tokens) > 1
and event_tokens[1] == condition_tokens[1]):
# Stop appending tokens to this argument, as the next
# condition will be satisfied as well
condition_tokens.pop(0)
event_tokens.pop(0)
else:
result.score -= 1.0
event_tokens.pop(0)
# It's a match if all the tokens in the condition string have been satisfied
result.is_match = len(condition_tokens) == 0
return result
def __str__(self):
"""
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
args = copy.deepcopy(self.args)
flatten(args)
return json.dumps({
'type': 'event',
'target': self.target,
'origin': self.origin if hasattr(self, 'origin') else None,
'id': self.id if hasattr(self, 'id') else None,
'_timestamp': self.timestamp,
'args': {
'type': self.type,
**args
},
}, cls=self.Encoder)
class EventMatchResult(object):
""" When comparing an event against an event condition, you want to
return this object. It contains the match status (True or False),
any parsed arguments, and a match_score that identifies how "strong"
the match is - in case of multiple event matches, the ones with the
highest score will win """
def __init__(self, is_match, score=0, parsed_args=None):
self.is_match = is_match
self.score = score
self.parsed_args = {} if not parsed_args else parsed_args
def flatten(args):
if isinstance(args, dict):
for (key, value) in args.items():
if isinstance(value, date):
args[key] = value.isoformat()
elif isinstance(value, (dict, list)):
flatten(args[key])
elif isinstance(args, list):
for i, arg in enumerate(args):
if isinstance(arg, date):
args[i] = arg.isoformat()
elif isinstance(arg, (dict, list)):
flatten(args[i])
# vim:sw=4:ts=4:et: