Better event hooks filters.

- Support for nested attributes on event hook conditions. Things like
  these are now possible:

```
from platypush.event.hook import hook
from platypush.message.event.entities import EntityUpdateEvent

@hook(EntityUpdateEvent, entity={"external_id": "system:cpu"})
def on_cpu_update_event(event: EntityUpdateEvent, **_):
    print(event.args["entity"]["percent"])
```

- The scoring/regex extraction/partial string match logic in
  `_matches_argument` is actually only needed for
  `SpeechRecognizedEvent`. Other events don't need these features, and
  event hooks may be actually triggered unexpectedly in case of partial
  matches. Therefore, the "complex" `_matches_argument` has been moved
  as an override only for `SpeechRecognizedEvent`, and all the other
  events will perform simple key-value matching.
This commit is contained in:
Fabio Manganiello 2023-04-26 01:45:58 +02:00
parent ee54e0edbf
commit 245472a4c5
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 217 additions and 128 deletions

View file

@ -2,8 +2,6 @@ import copy
import json
import logging
import random
import re
import sys
import time
from datetime import date
@ -31,7 +29,7 @@ class Event(Message):
timestamp=None,
logging_level=logging.INFO,
disable_web_clients_notification=False,
**kwargs
**kwargs,
):
"""
:param target: Target node
@ -53,7 +51,7 @@ class Event(Message):
self.id = id if id else self._generate_id()
self.target = target if target else Config.get('device_id')
self.origin = origin if origin else Config.get('device_id')
self.type = '{}.{}'.format(self.__class__.__module__, self.__class__.__name__)
self.type = f'{self.__class__.__module__}.{self.__class__.__name__}'
self.args = kwargs
self.disable_web_clients_notification = disable_web_clients_notification
@ -71,8 +69,10 @@ class Event(Message):
@classmethod
def build(cls, msg):
"""Builds an event message from a JSON UTF-8 string/bytearray, a
dictionary, or another Event"""
"""
Builds an event message from a JSON UTF-8 string/bytearray, a
dictionary, or another Event
"""
msg = super().parse(msg)
event_type = msg['args'].pop('type')
@ -88,7 +88,42 @@ class Event(Message):
@staticmethod
def _generate_id():
"""Generate a unique event ID"""
return ''.join(['{:02x}'.format(random.randint(0, 255)) for _ in range(16)])
return ''.join([f'{random.randint(0, 255):02x}' for _ in range(16)])
def _matches_condition(
self,
condition: dict,
args: dict,
result: "EventMatchResult",
match_scores: list,
) -> bool:
for attr, value in condition.items():
if attr not in args:
return False
if isinstance(args[attr], str):
self._matches_argument(
argname=attr, condition_value=value, args=args, result=result
)
if result.is_match:
match_scores.append(result.score)
else:
return False
elif isinstance(value, dict):
if not isinstance(args[attr], dict):
return False
return self._matches_condition(
condition=value,
args=args[attr],
result=result,
match_scores=match_scores,
)
elif args[attr] != value:
return False
return True
def matches_condition(self, condition):
"""
@ -102,21 +137,12 @@ class Event(Message):
if not isinstance(self, condition.type):
return result
for attr, value in condition.args.items():
if attr not in self.args:
return result
if isinstance(self.args[attr], str):
arg_result = self._matches_argument(argname=attr, condition_value=value)
if arg_result.is_match:
match_scores.append(arg_result.score)
for parsed_arg, parsed_value in arg_result.parsed_args.items():
result.parsed_args[parsed_arg] = parsed_value
else:
return result
elif self.args[attr] != value:
# TODO proper support for list and dictionary matches
if not self._matches_condition(
condition=condition.args,
args=self.args,
result=result,
match_scores=match_scores,
):
return result
result.is_match = True
@ -125,75 +151,20 @@ class Event(Message):
return result
def _matches_argument(self, argname, condition_value):
def _matches_argument(
self, argname, condition_value, args, result: "EventMatchResult"
):
"""
Returns an EventMatchResult if the event argument [argname] matches
[condition_value].
- Example:
- self.args = {
'phrase': 'Hey dude turn on the living room lights'
}
- self._matches_argument(argname='phrase', condition_value='Turn on the ${lights} lights')
will return EventMatchResult(is_match=True, parsed_args={ 'lights': 'living room' })
- self._matches_argument(argname='phrase', condition_value='Turn off the ${lights} lights')
will return EventMatchResult(is_match=False, parsed_args={})
"""
result = EventMatchResult(is_match=False)
if self.args.get(argname) == condition_value:
# In case of an exact match, return immediately
result.is_match = True
result.score = sys.maxsize
return result
event_tokens = re.split(r'\s+', self.args.get(argname, '').strip().lower())
condition_tokens = re.split(r'\s+', condition_value.strip().lower())
while event_tokens and condition_tokens:
event_token = event_tokens[0]
condition_token = condition_tokens[0]
if event_token == condition_token:
event_tokens.pop(0)
condition_tokens.pop(0)
result.score += 1.5
elif re.search(condition_token, event_token):
m = re.search('({})'.format(condition_token), event_token)
if m and m.group(1):
event_tokens.pop(0)
result.score += 1.25
condition_tokens.pop(0)
# Simple equality match by default. It can be overridden by the derived classes.
result.is_match = args.get(argname) == condition_value
if result.is_match:
result.score += 2
else:
m = re.match(r'[^\\]*\${(.+?)}', condition_token)
if m:
argname = m.group(1)
if argname not in result.parsed_args:
result.parsed_args[argname] = event_token
result.score += 1.0
else:
result.parsed_args[argname] += ' ' + event_token
if (len(condition_tokens) == 1 and len(event_tokens) == 1) or (
len(event_tokens) > 1
and len(condition_tokens) > 1
and event_tokens[1] == condition_tokens[1]
):
# Stop appending tokens to this argument, as the next
# condition will be satisfied as well
condition_tokens.pop(0)
event_tokens.pop(0)
else:
result.score -= 1.0
event_tokens.pop(0)
# It's a match if all the tokens in the condition string have been satisfied
result.is_match = len(condition_tokens) == 0
return result
result.score = 0
def __str__(self):
"""
@ -218,11 +189,13 @@ class Event(Message):
class EventMatchResult:
"""When comparing an event against an event condition, you want to
"""
When comparing an event against an event condition, you want to
return this object. It contains the match status (True or False),
any parsed arguments, and a match_score that identifies how "strong"
the match is - in case of multiple event matches, the ones with the
highest score will win"""
highest score will win.
"""
def __init__(self, is_match, score=0.0, parsed_args=None):
self.is_match = is_match
@ -231,6 +204,9 @@ class EventMatchResult:
def flatten(args):
"""
Flatten a nested dictionary for string serialization.
"""
if isinstance(args, dict):
for key, value in args.items():
if isinstance(value, date):

View file

@ -1,13 +1,16 @@
import logging
import re
import sys
from typing_extensions import override
from platypush.context import get_backend, get_plugin
from platypush.message.event import Event
class AssistantEvent(Event):
""" Base class for assistant events """
"""Base class for assistant events"""
def __init__(self, assistant=None, *args, **kwargs):
def __init__(self, *args, assistant=None, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger('platypush:assistant')
@ -20,7 +23,9 @@ class AssistantEvent(Event):
self._assistant = get_plugin('assistant.google.pushtotalk')
if not self._assistant:
self.logger.warning('Assistant plugin/backend not configured/initialized')
self.logger.warning(
'Assistant plugin/backend not configured/initialized'
)
self._assistant = None
@ -38,7 +43,7 @@ class ConversationEndEvent(AssistantEvent):
Event triggered when a conversation ends
"""
def __init__(self, with_follow_on_turn=False, *args, **kwargs):
def __init__(self, *args, with_follow_on_turn=False, **kwargs):
"""
:param with_follow_on_turn: Set to true if the conversation expects a user follow-up, false otherwise
:type with_follow_on_turn: str
@ -75,9 +80,6 @@ class NoResponseEvent(ConversationEndEvent):
Event triggered when a conversation ends with no response
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class SpeechRecognizedEvent(AssistantEvent):
"""
@ -105,18 +107,89 @@ class SpeechRecognizedEvent(AssistantEvent):
return result
@override
def _matches_argument(self, argname, condition_value, args, result):
"""
Overrides the default `_matches_argument` method to allow partial
phrase matches and text extraction.
Example::
args = {
'phrase': 'Hey dude turn on the living room lights'
}
- `self._matches_argument(argname='phrase', condition_value='Turn on the ${lights} lights')`
will return `EventMatchResult(is_match=True, parsed_args={ 'lights': 'living room' })`
- `self._matches_argument(argname='phrase', condition_value='Turn off the ${lights} lights')`
will return `EventMatchResult(is_match=False, parsed_args={})`
"""
if args.get(argname) == condition_value:
# In case of an exact match, return immediately
result.is_match = True
result.score = sys.maxsize
return result
event_tokens = re.split(r'\s+', args.get(argname, '').strip().lower())
condition_tokens = re.split(r'\s+', condition_value.strip().lower())
while event_tokens and condition_tokens:
event_token = event_tokens[0]
condition_token = condition_tokens[0]
if event_token == condition_token:
event_tokens.pop(0)
condition_tokens.pop(0)
result.score += 1.5
elif re.search(condition_token, event_token):
m = re.search(f'({condition_token})', event_token)
if m and m.group(1):
event_tokens.pop(0)
result.score += 1.25
condition_tokens.pop(0)
else:
m = re.match(r'[^\\]*\${(.+?)}', condition_token)
if m:
argname = m.group(1)
if argname not in result.parsed_args:
result.parsed_args[argname] = event_token
result.score += 1.0
else:
result.parsed_args[argname] += ' ' + event_token
if (len(condition_tokens) == 1 and len(event_tokens) == 1) or (
len(event_tokens) > 1
and len(condition_tokens) > 1
and event_tokens[1] == condition_tokens[1]
):
# Stop appending tokens to this argument, as the next
# condition will be satisfied as well
condition_tokens.pop(0)
event_tokens.pop(0)
else:
result.score -= 1.0
event_tokens.pop(0)
# It's a match if all the tokens in the condition string have been satisfied
result.is_match = len(condition_tokens) == 0
return result
class HotwordDetectedEvent(AssistantEvent):
"""
Event triggered when a custom hotword is detected
"""
def __init__(self, hotword=None, *args, **kwargs):
def __init__(self, *args, hotword=None, **kwargs):
"""
:param hotword: The detected user hotword
:type hotword: str
"""
super().__init__(*args, hotword=hotword, **kwargs)
@ -134,67 +207,47 @@ class AlertStartedEvent(AssistantEvent):
Event triggered when an alert starts on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class AlertEndEvent(AssistantEvent):
"""
Event triggered when an alert ends on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class AlarmStartedEvent(AlertStartedEvent):
"""
Event triggered when an alarm starts on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class AlarmEndEvent(AlertEndEvent):
"""
Event triggered when an alarm ends on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class TimerStartedEvent(AlertStartedEvent):
"""
Event triggered when a timer starts on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class TimerEndEvent(AlertEndEvent):
"""
Event triggered when a timer ends on the assistant
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class MicMutedEvent(AssistantEvent):
"""
Event triggered when the microphone is muted.
"""
pass
class MicUnmutedEvent(AssistantEvent):
"""
Event triggered when the microphone is muted.
"""
pass
# vim:sw=4:ts=4:et:

View file

@ -1,28 +1,88 @@
import pytest
from platypush.event.hook import EventCondition
from platypush.message.event.assistant import SpeechRecognizedEvent
from platypush.message.event.ping import PingEvent
condition = EventCondition.build({
'type': 'platypush.message.event.ping.PingEvent',
'message': 'This is (the)? answer: ${answer}'
})
def test_event_parse():
"""
Test for the events/conditions matching logic.
"""
message = "GARBAGE GARBAGE this is the answer: 42"
event = PingEvent(message=message)
condition = EventCondition.build(
{
'type': 'platypush.message.event.ping.PingEvent',
'message': 'This is a test message',
}
)
event = PingEvent(message=condition.args['message'])
result = event.matches_condition(condition)
assert result.is_match
event = PingEvent(message="This is not a test message")
result = event.matches_condition(condition)
assert not result.is_match
def test_nested_event_condition():
"""
Verify that nested event conditions work as expected.
"""
condition = EventCondition.build(
{
'type': 'platypush.message.event.ping.PingEvent',
'message': {
'foo': 'bar',
},
}
)
event = PingEvent(
message={
'foo': 'bar',
'baz': 'clang',
}
)
assert event.matches_condition(condition).is_match
event = PingEvent(
message={
'something': 'else',
}
)
assert not event.matches_condition(condition).is_match
event = PingEvent(
message={
'foo': 'baz',
}
)
assert not event.matches_condition(condition).is_match
def test_speech_recognized_event_parse():
"""
Test the event parsing and text extraction logic for the
SpeechRecognizedEvent.
"""
condition = EventCondition.build(
{
'type': 'platypush.message.event.assistant.SpeechRecognizedEvent',
'phrase': 'This is (the)? answer: ${answer}',
}
)
event = SpeechRecognizedEvent(phrase="GARBAGE GARBAGE this is the answer: 42")
result = event.matches_condition(condition)
assert result.is_match
assert 'answer' in result.parsed_args
assert result.parsed_args['answer'] == '42'
message = "what is not the answer? 43"
event = PingEvent(message=message)
event = PingEvent(phrase="what is not the answer? 43")
result = event.matches_condition(condition)
assert not result.is_match