Compare commits

...

6 commits

Author SHA1 Message Date
55671f4aff
If a request on a RunnablePlugin throws an exception then we should also restart the plugin upon reload
Plus some Black/LINT chores
2022-07-25 00:41:08 +02:00
c32142c8b5
Added wait_stop() method to RunnablePlugin 2022-07-23 17:33:23 +02:00
32be4df11c
More robust way to retrieve an object's attribute on schemas 2022-07-23 17:32:14 +02:00
3edb8352b4
Support sections with empty bodies in the YAML configuration files. 2022-07-16 02:09:22 +02:00
cc29136db7
[#2] Support for caching rooms info and exposing them in the events 2022-07-15 00:37:21 +02:00
719bd4fddf
[#217 WIP] Initial plugin implementation.
- Added initial synchronization and users cache.
- Added loop to poll for new events (TODO: use websocket after the first sync)
- Added login, sync and join actions
2022-07-14 01:50:46 +02:00
8 changed files with 736 additions and 68 deletions

View file

@ -213,11 +213,10 @@ class Config:
config['scripts_dir'] = os.path.abspath( config['scripts_dir'] = os.path.abspath(
os.path.expanduser(file_config[section]) os.path.expanduser(file_config[section])
) )
elif ( else:
'disabled' not in file_config[section] section_config = file_config.get(section, {}) or {}
or file_config[section]['disabled'] is False if not section_config.get('disabled'):
): config[section] = section_config
config[section] = file_config[section]
return config return config

View file

@ -0,0 +1,103 @@
from abc import ABC
from datetime import datetime
from typing import Dict, Any
from platypush.message.event import Event
class MatrixEvent(Event, ABC):
"""
Base matrix event.
"""
def __init__(
self,
*args,
server_url: str,
sender_id: str | None = None,
sender_display_name: str | None = None,
sender_avatar_url: str | None = None,
room_id: str | None = None,
room_name: str | None = None,
room_topic: str | None = None,
server_timestamp: datetime | None = None,
**kwargs
):
"""
:param server_url: Base server URL.
:param sender_id: The event's sender ID.
:param sender_display_name: The event's sender display name.
:param sender_avatar_url: The event's sender avatar URL.
:param room_id: Event room ID.
:param room_name: The name of the room associated to the event.
:param room_topic: The topic of the room associated to the event.
:param server_timestamp: The server timestamp of the event.
"""
evt_args: Dict[str, Any] = {
'server_url': server_url,
}
if sender_id:
evt_args['sender_id'] = sender_id
if sender_display_name:
evt_args['sender_display_name'] = sender_display_name
if sender_avatar_url:
evt_args['sender_avatar_url'] = sender_avatar_url
if room_id:
evt_args['room_id'] = room_id
if room_name:
evt_args['room_name'] = room_name
if room_topic:
evt_args['room_topic'] = room_topic
if server_timestamp:
evt_args['server_timestamp'] = server_timestamp
super().__init__(*args, **evt_args, **kwargs)
class MatrixMessageEvent(MatrixEvent):
"""
Event triggered when a message is received on a subscribed room.
"""
def __init__(self, *args, body: str, **kwargs):
"""
:param body: The body of the message.
"""
super().__init__(*args, body=body, **kwargs)
class MatrixRoomJoinEvent(MatrixEvent):
"""
Event triggered when a user joins a room.
"""
class MatrixRoomLeaveEvent(MatrixEvent):
"""
Event triggered when a user leaves a room.
"""
class MatrixRoomInviteEvent(MatrixEvent):
"""
Event triggered when a user is invited to a room.
"""
class MatrixRoomInviteMeEvent(MatrixEvent):
"""
Event triggered when the currently logged in user is invited to a room.
"""
class MatrixRoomTopicChangeEvent(MatrixEvent):
"""
Event triggered when the topic/title of a room changes.
"""
def __init__(self, *args, topic: str, **kwargs):
"""
:param topic: New room topic.
"""
super().__init__(*args, topic=topic, **kwargs)

View file

@ -12,8 +12,12 @@ from platypush.config import Config
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message import Message from platypush.message import Message
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \ from platypush.utils import (
is_functional_procedure get_hash,
get_module_and_method_from_action,
get_redis_queue_name_by_message,
is_functional_procedure,
)
logger = logging.getLogger('platypush') logger = logging.getLogger('platypush')
@ -21,8 +25,17 @@ logger = logging.getLogger('platypush')
class Request(Message): class Request(Message):
"""Request message class""" """Request message class"""
def __init__(self, target, action, origin=None, id=None, backend=None, def __init__(
args=None, token=None, timestamp=None): self,
target,
action,
origin=None,
id=None,
backend=None,
args=None,
token=None,
timestamp=None,
):
""" """
Params: Params:
target -- Target node [Str] target -- Target node [Str]
@ -48,9 +61,13 @@ class Request(Message):
@classmethod @classmethod
def build(cls, msg): def build(cls, msg):
msg = super().parse(msg) msg = super().parse(msg)
args = {'target': msg.get('target', Config.get('device_id')), 'action': msg['action'], args = {
'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(), 'target': msg.get('target', Config.get('device_id')),
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()} 'action': msg['action'],
'args': msg.get('args', {}),
'id': msg['id'] if 'id' in msg else cls._generate_id(),
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(),
}
if 'origin' in msg: if 'origin' in msg:
args['origin'] = msg['origin'] args['origin'] = msg['origin']
@ -61,7 +78,7 @@ class Request(Message):
@staticmethod @staticmethod
def _generate_id(): def _generate_id():
_id = '' _id = ''
for i in range(0, 16): for _ in range(0, 16):
_id += '%.2x' % random.randint(0, 255) _id += '%.2x' % random.randint(0, 255)
return _id return _id
@ -84,9 +101,14 @@ class Request(Message):
return proc_config(*args, **kwargs) return proc_config(*args, **kwargs)
proc = Procedure.build(name=proc_name, requests=proc_config['actions'], proc = Procedure.build(
_async=proc_config['_async'], args=self.args, name=proc_name,
backend=self.backend, id=self.id) requests=proc_config['actions'],
_async=proc_config['_async'],
args=self.args,
backend=self.backend,
id=self.id,
)
return proc.execute(*args, **kwargs) return proc.execute(*args, **kwargs)
@ -112,7 +134,7 @@ class Request(Message):
if isinstance(value, str): if isinstance(value, str):
value = self.expand_value_from_context(value, **context) value = self.expand_value_from_context(value, **context)
elif isinstance(value, dict) or isinstance(value, list): elif isinstance(value, (dict, list)):
self._expand_context(event_args=value, **context) self._expand_context(event_args=value, **context)
event_args[key] = value event_args[key] = value
@ -132,7 +154,11 @@ class Request(Message):
try: try:
exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v))) exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v)))
except Exception as e: except Exception as e:
logger.debug('Could not set context variable {}={}: {}'.format(k, v, str(e))) logger.debug(
'Could not set context variable {}={}: {}'.format(
k, v, str(e)
)
)
logger.debug('Context: {}'.format(context)) logger.debug('Context: {}'.format(context))
parsed_value = '' parsed_value = ''
@ -152,7 +178,7 @@ class Request(Message):
if callable(context_value): if callable(context_value):
context_value = context_value() context_value = context_value()
if isinstance(context_value, range) or isinstance(context_value, tuple): if isinstance(context_value, (range, tuple)):
context_value = [*context_value] context_value = [*context_value]
if isinstance(context_value, datetime.date): if isinstance(context_value, datetime.date):
context_value = context_value.isoformat() context_value = context_value.isoformat()
@ -162,7 +188,7 @@ class Request(Message):
parsed_value += prefix + ( parsed_value += prefix + (
json.dumps(context_value) json.dumps(context_value)
if isinstance(context_value, list) or isinstance(context_value, dict) if isinstance(context_value, (list, dict))
else str(context_value) else str(context_value)
) )
else: else:
@ -205,6 +231,9 @@ class Request(Message):
""" """
def _thread_func(_n_tries, errors=None): def _thread_func(_n_tries, errors=None):
from platypush.context import get_bus
from platypush.plugins import RunnablePlugin
response = None response = None
try: try:
@ -221,11 +250,15 @@ class Request(Message):
return response return response
else: else:
action = self.expand_value_from_context(self.action, **context) action = self.expand_value_from_context(self.action, **context)
(module_name, method_name) = get_module_and_method_from_action(action) (module_name, method_name) = get_module_and_method_from_action(
action
)
plugin = get_plugin(module_name) plugin = get_plugin(module_name)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(self.action, str(e)) msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(
self.action, str(e)
)
logger.warning(msg) logger.warning(msg)
response = Response(output=None, errors=[msg]) response = Response(output=None, errors=[msg])
self._send_response(response) self._send_response(response)
@ -243,24 +276,37 @@ class Request(Message):
response = plugin.run(method_name, args) response = plugin.run(method_name, args)
if not response: if not response:
logger.warning('Received null response from action {}'.format(action)) logger.warning(
'Received null response from action {}'.format(action)
)
else: else:
if response.is_error(): if response.is_error():
logger.warning(('Response processed with errors from ' + logger.warning(
'action {}: {}').format( (
action, str(response))) 'Response processed with errors from ' + 'action {}: {}'
).format(action, str(response))
)
elif not response.disable_logging: elif not response.disable_logging:
logger.info('Processed response from action {}: {}'. logger.info(
format(action, str(response))) 'Processed response from action {}: {}'.format(
action, str(response)
)
)
except (AssertionError, TimeoutError) as e: except (AssertionError, TimeoutError) as e:
plugin.logger.exception(e) plugin.logger.exception(e)
logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e))) logger.warning(
'{} from action [{}]: {}'.format(type(e), action, str(e))
)
response = Response(output=None, errors=[str(e)]) response = Response(output=None, errors=[str(e)])
except Exception as e: except Exception as e:
# Retry mechanism # Retry mechanism
plugin.logger.exception(e) plugin.logger.exception(e)
logger.warning(('Uncaught exception while processing response ' + logger.warning(
'from action [{}]: {}').format(action, str(e))) (
'Uncaught exception while processing response '
+ 'from action [{}]: {}'
).format(action, str(e))
)
errors = errors or [] errors = errors or []
if str(e) not in errors: if str(e) not in errors:
@ -269,16 +315,20 @@ class Request(Message):
response = Response(output=None, errors=errors) response = Response(output=None, errors=errors)
if _n_tries - 1 > 0: if _n_tries - 1 > 0:
logger.info('Reloading plugin {} and retrying'.format(module_name)) logger.info('Reloading plugin {} and retrying'.format(module_name))
get_plugin(module_name, reload=True) plugin = get_plugin(module_name, reload=True)
if isinstance(plugin, RunnablePlugin):
plugin.bus = get_bus()
plugin.start()
response = _thread_func(_n_tries=_n_tries - 1, errors=errors) response = _thread_func(_n_tries=_n_tries - 1, errors=errors)
finally: finally:
self._send_response(response) self._send_response(response)
return response return response
token_hash = Config.get('token_hash') stored_token_hash = Config.get('token_hash')
token = getattr(self, 'token', '')
if token_hash: if stored_token_hash and get_hash(token) != stored_token_hash:
if self.token is None or get_hash(self.token) != token_hash:
raise PermissionError() raise PermissionError()
if _async: if _async:
@ -292,7 +342,8 @@ class Request(Message):
the message into a UTF-8 JSON string the message into a UTF-8 JSON string
""" """
return json.dumps({ return json.dumps(
{
'type': 'request', 'type': 'request',
'target': self.target, 'target': self.target,
'action': self.action, 'action': self.action,
@ -301,6 +352,8 @@ class Request(Message):
'id': self.id if hasattr(self, 'id') else None, 'id': self.id if hasattr(self, 'id') else None,
'token': self.token if hasattr(self, 'token') else None, 'token': self.token if hasattr(self, 'token') else None,
'_timestamp': self.timestamp, '_timestamp': self.timestamp,
}) }
)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -19,12 +19,12 @@ def action(f):
result = f(*args, **kwargs) result = f(*args, **kwargs)
if result and isinstance(result, Response): if result and isinstance(result, Response):
result.errors = result.errors \ result.errors = (
if isinstance(result.errors, list) else [result.errors] result.errors if isinstance(result.errors, list) else [result.errors]
)
response = result response = result
elif isinstance(result, tuple) and len(result) == 2: elif isinstance(result, tuple) and len(result) == 2:
response.errors = result[1] \ response.errors = result[1] if isinstance(result[1], list) else [result[1]]
if isinstance(result[1], list) else [result[1]]
if len(response.errors) == 1 and response.errors[0] is None: if len(response.errors) == 1 and response.errors[0] is None:
response.errors = [] response.errors = []
@ -44,7 +44,9 @@ class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-t
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__() super().__init__()
self.logger = logging.getLogger('platypush:plugin:' + get_plugin_name_by_class(self.__class__)) self.logger = logging.getLogger(
'platypush:plugin:' + get_plugin_name_by_class(self.__class__)
)
if 'logging' in kwargs: if 'logging' in kwargs:
self.logger.setLevel(getattr(logging, kwargs['logging'].upper())) self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
@ -53,8 +55,9 @@ class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-t
) )
def run(self, method, *args, **kwargs): def run(self, method, *args, **kwargs):
assert method in self.registered_actions, '{} is not a registered action on {}'.\ assert (
format(method, self.__class__.__name__) method in self.registered_actions
), '{} is not a registered action on {}'.format(method, self.__class__.__name__)
return getattr(self, method)(*args, **kwargs) return getattr(self, method)(*args, **kwargs)
@ -62,6 +65,7 @@ class RunnablePlugin(Plugin):
""" """
Class for runnable plugins - i.e. plugins that have a start/stop method and can be started. Class for runnable plugins - i.e. plugins that have a start/stop method and can be started.
""" """
def __init__(self, poll_interval: Optional[float] = None, **kwargs): def __init__(self, poll_interval: Optional[float] = None, **kwargs):
""" """
:param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval). :param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval).
@ -78,6 +82,9 @@ class RunnablePlugin(Plugin):
def should_stop(self): def should_stop(self):
return self._should_stop.is_set() return self._should_stop.is_set()
def wait_stop(self, timeout=None):
return self._should_stop.wait(timeout=timeout)
def start(self): def start(self):
set_thread_name(self.__class__.__name__) set_thread_name(self.__class__.__name__)
self._thread = threading.Thread(target=self._runner) self._thread = threading.Thread(target=self._runner)

View file

@ -0,0 +1,488 @@
import datetime
import json
import multiprocessing
import os
import pathlib
import requests
from abc import ABC, abstractmethod
from urllib.parse import urljoin
from typing import Optional, Collection, Dict, Tuple, Any
from platypush.config import Config
from platypush.context import get_bus
from platypush.message.event.matrix import (
MatrixEvent,
MatrixRoomTopicChangeEvent,
MatrixMessageEvent,
MatrixRoomJoinEvent,
MatrixRoomLeaveEvent,
MatrixRoomInviteEvent,
MatrixRoomInviteMeEvent,
)
from platypush.plugins import RunnablePlugin, action
class RetrieveWorker(ABC):
def __init__(self, server_url: str, access_token: str):
self._server_url = server_url
self._access_token = access_token
@abstractmethod
def _url(self, id: int | str) -> str:
raise NotImplementedError()
@abstractmethod
def _process_response(self, rs: dict | Collection[dict]) -> dict:
raise NotImplementedError()
def __call__(self, id: str) -> Tuple[str, dict]:
url = urljoin(self._server_url, self._url(id))
rs = requests.get(
url,
headers={
'Authorization': f'Bearer {self._access_token}',
},
)
rs.raise_for_status()
return (id, self._process_response(rs.json()))
class UserRetrieveWorker(RetrieveWorker):
def _url(self, id: str) -> str:
return f'/_matrix/client/r0/profile/{id}'
def _process_response(self, rs: dict) -> dict:
return {
'display_name': rs.get('displayname'),
'avatar_url': rs.get('avatar_url'),
}
class RoomRetrieveWorker(RetrieveWorker):
def _url(self, id: str) -> str:
return f'/_matrix/client/v3/rooms/{id}/state'
def _process_response(self, rs: Collection[dict]) -> dict:
info = {}
for event in rs:
event_type = event.get('type')
if event_type == 'm.room.name':
info['name'] = event.get('content', {}).get('name')
elif event_type == 'm.room.topic':
info['name'] = event.get('content', {}).get('topic')
return info
class MatrixPlugin(RunnablePlugin):
"""
Matrix chat integration.
Triggers:
* :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received.
* :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room.
* :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room.
* :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when a user (other than the
currently logged one) is invited to a room.
* :class:`platypush.message.event.matrix.MatrixRoomMeInviteEvent`: when the currently logged in
user is invited to a room.
* :class:`platypush.message.event.matrix.MatrixRoomTopicChangeEvent`: when the topic/title of a room changes.
"""
def __init__(
self,
server_url: str = 'https://matrix.to',
username: str | None = None,
password: str | None = None,
access_token: str | None = None,
autojoin_on_invite: bool = False,
**kwargs,
):
"""
Authentication requires either username/password or an access token.
If you don't want to provide cleartext credentials in the configuration, you can
retrieve an access token offline through the following request::
curl -XPOST '{"type":"m.login.password", "user":"username", "password":"password"}' \
"https://matrix.example.com/_matrix/client/r0/login"
This may be required if the user or the instance enforce 2FA.
:param server_url: Default Matrix instance base URL (default: ``https://matrix.to``).
:param username: Default username. Provide either username/password _or_ an access token.
:param password: Default password. Provide either username/password _or_ an access token.
:param access_token: Default access token. Provide either username/password _or_ an access token.
:param autojoin_on_invite: Whether the account should automatically join rooms
upon invite. If false (default value), then you may want to implement your own
logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteMeEvent`
event is received, and call the :meth:`.join` method if required.
"""
super().__init__(**kwargs)
self._server_url = server_url
self._user_id = None
self._autojoin_on_invite = autojoin_on_invite
self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore
pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True)
self._sessions_file = os.path.join(self._workdir, 'sessions.json')
self._credentials_file = os.path.join(self._workdir, 'credentials.json')
self._users_cache_file = os.path.join(self._workdir, 'users.json')
self._rooms_cache_file = os.path.join(self._workdir, 'rooms.json')
self._users_cache: Dict[str, dict] = {}
self._rooms_cache: Dict[str, dict] = {}
self._set_credentials(username, password, access_token, overwrite=True)
def _set_credentials(
self,
username: str | None = None,
password: str | None = None,
access_token: str | None = None,
overwrite: bool = False,
):
if username or overwrite:
self._username = username
if password or overwrite:
self._password = password
if access_token or overwrite:
self._access_token = access_token
def _execute(self, url: str, method: str = 'get', **kwargs):
if self._access_token:
kwargs['headers'] = {
'Authorization': f'Bearer {self._access_token}',
**kwargs.get('headers', {}),
}
url = urljoin(self._server_url, f'/_matrix/client/{url.lstrip("/")}')
req_method = getattr(requests, method.lower())
rs = req_method(url, **kwargs)
rs.raise_for_status()
rs = rs.json()
assert not rs.get('error'), rs.get('error')
return rs
def _save_credentials(self, credentials: dict):
with open(self._credentials_file, 'w') as f:
json.dump(credentials, f)
def _refresh_user_id(self):
devices = self._execute('/v3/devices').get('devices', [])
assert devices, 'The user is not logged into any devices'
self._user_id = devices[0]['user_id']
@action
def login(
self,
server_url: str | None = None,
username: str | None = None,
password: str | None = None,
access_token: str | None = None,
):
"""
Login to an instance if username/password/access_token were not specified in the plugin
configuration. Otherwise, change the currently logged user or instance.
:param server_url: New Matrix instance base URL.
:param username: New username.
:param password: New user password.
:param access_token: New access token.
"""
self._server_url = server_url or self._server_url
self._set_credentials(username, password, access_token, overwrite=False)
if self._access_token:
self._refresh_user_id()
elif self._username and self._password:
rs = self._execute(
'/r0/login',
method='post',
json={
'type': 'm.login.password',
'user': self._username,
'password': self._password,
'initial_device_display_name': 'Platypush Matrix integration',
},
)
assert rs.get('access_token'), 'No access token provided by the server'
self._access_token = rs['access_token']
self._user_id = rs['user_id']
self._save_credentials(rs)
elif os.path.isfile(self._credentials_file):
with open(self._credentials_file, 'r') as f:
self._access_token = json.load(f)['access_token']
self._refresh_user_id()
else:
raise AssertionError(
'No username, password and access token provided nor stored'
)
self.logger.info(
f'Successfully logged in to {self._server_url} as {self._user_id}'
)
@staticmethod
def _timestamp_to_datetime(t: int | float) -> datetime.datetime:
return datetime.datetime.fromtimestamp(t / 1000)
def _parse_event(
self, room_id: str, event: dict, users: dict
) -> Optional[MatrixEvent]:
evt_type = event.get('type')
evt_class = None
room_info = self._rooms_cache.get(room_id, {})
args: Dict[str, Any] = {
'server_url': self._server_url,
'room_id': room_id,
'room_name': room_info.get('name'),
'room_topic': room_info.get('topic'),
}
if event.get('sender') and isinstance(event.get('sender'), str):
cached_user = users.get(event['sender'], {})
args['sender_id'] = event['sender']
args['sender_display_name'] = cached_user.get('display_name')
args['sender_avatar_url'] = cached_user.get('avatar_url')
if event.get('origin_server_ts'):
args['server_timestamp'] = self._timestamp_to_datetime(
event['origin_server_ts']
)
if evt_type == 'm.room.topic':
evt_class = MatrixRoomTopicChangeEvent
args['topic'] = event.get('content', {}).get('topic') # type: ignore
# TODO Handle encrypted rooms events (`m.room.encrypted`)
elif evt_type == 'm.room.message':
evt_class = MatrixMessageEvent
args['body'] = event.get('content', {}).get('body') # type: ignore
elif evt_type == 'm.room.member':
membership = event.get('content', {}).get('membership')
if membership == 'join':
evt_class = MatrixRoomJoinEvent
elif membership == 'invite':
evt_class = MatrixRoomInviteEvent
elif membership == 'leave':
evt_class = MatrixRoomLeaveEvent
if evt_class:
return evt_class(**args)
def _parse_invite_event(
self, room_id: str, events: Collection[dict]
) -> MatrixRoomInviteMeEvent:
evt_args: Dict[str, Any] = {
'server_url': self._server_url,
'room_id': room_id,
}
for event in events:
evt_type = event.get('type')
if evt_type == 'm.room.name':
evt_args['room_name'] = event.get('content', {}).get('name')
elif evt_type == 'm.room.topic':
evt_args['room_topic'] = event.get('content', {}).get('topic')
if event.get('origin_server_ts'):
evt_args['server_timestamp'] = self._timestamp_to_datetime(
event['origin_server_ts']
)
if evt_args.get('room_name'):
self._rooms_cache[room_id] = {
'room_id': room_id,
'room_name': evt_args['room_name'],
'room_topic': evt_args.get('room_topic'),
}
self._rewrite_rooms_cache()
return MatrixRoomInviteMeEvent(**evt_args)
def _retrieve_users_info(self, users: Collection[str]) -> Dict[str, dict]:
users_info = {user: {} for user in users}
retrieve = UserRetrieveWorker(self._server_url, self._access_token or '')
with multiprocessing.Pool(4) as pool:
pool_res = pool.map(retrieve, users_info.keys())
return {
user_id: {
'user_id': user_id,
**info,
}
for user_id, info in pool_res
}
def _retrieve_rooms_info(self, rooms: Collection[str]) -> Dict[str, dict]:
rooms_info = {room: {} for room in rooms}
retrieve = RoomRetrieveWorker(self._server_url, self._access_token or '')
with multiprocessing.Pool(4) as pool:
pool_res = pool.map(retrieve, rooms_info.keys())
return {
room_id: {
'room_id': room_id,
**info,
}
for room_id, info in pool_res
}
def _extract_senders(self, rooms) -> Dict[str, dict]:
cache_has_changes = False
senders = set()
for room in rooms:
room_events = room.get('timeline', {}).get('events', [])
for evt in room_events:
if evt.get('type') == 'm.room.member':
cache_has_changes = True
self._users_cache[evt['sender']] = {
'user_id': evt['sender'],
'display_name': evt.get('content', {}).get('displayname'),
'avatar_url': evt.get('content', {}).get('avatar_url'),
}
senders.update({evt['sender'] for evt in room_events if evt.get('sender')})
missing_senders = {user for user in senders if user not in self._users_cache}
if missing_senders:
cache_has_changes = True
self._users_cache.update(self._retrieve_users_info(missing_senders))
senders_map = {
user: self._users_cache.get(user, {'user_id': user}) for user in senders
}
if cache_has_changes:
self._rewrite_users_cache()
return senders_map
def _extract_rooms(self, rooms: Collection[str]) -> Dict[str, dict]:
missing_rooms_info = {
room_id for room_id in rooms if not self._rooms_cache.get(room_id)
}
if missing_rooms_info:
self._rooms_cache.update(self._retrieve_rooms_info(missing_rooms_info))
self._rewrite_rooms_cache()
return {
room_id: self._rooms_cache.get(
room_id,
{
'room_id': room_id,
},
)
for room_id in rooms
}
def _process_events(self, events: dict) -> Collection[MatrixEvent]:
rooms = events.get('rooms', {})
joined_rooms = rooms.get('join', {})
invited_rooms = rooms.get('invite', {})
parsed_events = []
senders = self._extract_senders(joined_rooms.values())
self._extract_rooms(joined_rooms.keys())
# Create joined rooms events
for room_id, room in joined_rooms.items():
room_events = room.get('timeline', {}).get('events', [])
parsed_room_events = [
self._parse_event(room_id=room_id, event=event, users=senders)
for event in room_events
]
parsed_events.extend([evt for evt in parsed_room_events if evt])
# Create invite events
for room_id, room in invited_rooms.items():
room_events = room.get('invite_state', {}).get('events', [])
parsed_room_event = self._parse_invite_event(
room_id=room_id, events=room_events
)
parsed_events.append(parsed_room_event)
if self._autojoin_on_invite:
self.join(room_id)
parsed_events.sort(key=lambda e: e.server_timestamp)
return parsed_events
def _reload_users_cache(self):
if os.path.isfile(self._users_cache_file):
with open(self._users_cache_file, 'r') as f:
self._users_cache.update(json.load(f))
def _rewrite_users_cache(self):
with open(self._users_cache_file, 'w') as f:
json.dump(self._users_cache, f)
def _reload_rooms_cache(self):
if os.path.isfile(self._rooms_cache_file):
with open(self._rooms_cache_file, 'r') as f:
self._rooms_cache.update(json.load(f))
def _rewrite_rooms_cache(self):
with open(self._rooms_cache_file, 'w') as f:
json.dump(self._rooms_cache, f)
@action
def sync(self):
"""
Sync the state for the currently logged session.
"""
next_batch = None
sessions = {}
if os.path.isfile(self._sessions_file):
with open(self._sessions_file, 'r') as f:
sessions = json.load(f)
next_batch = sessions.get(self._user_id, {}).get('next_batch')
if not next_batch:
self.logger.info('Synchronizing Matrix events')
rs = self._execute('/r0/sync', params={'since': next_batch})
events = self._process_events(rs)
if events and next_batch:
for event in events:
get_bus().post(event)
if not sessions.get(self._user_id):
sessions[self._user_id] = {}
sessions[self._user_id]['next_batch'] = rs.get('next_batch')
with open(self._sessions_file, 'w') as f:
json.dump(sessions, f)
if not next_batch:
self.logger.info('Matrix events synchronized')
@action
def join(self, room_id: str):
"""
Join a room by ID.
:param room_id: Room ID or alias.
"""
self._execute(f'/v3/join/{room_id}', method='post')
self.logger.info('Successfully joined room %s', room_id)
def main(self):
self.login()
self._reload_users_cache()
self._reload_rooms_cache()
while not self._should_stop.is_set():
try:
self.sync()
finally:
self._should_stop.wait(timeout=10)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.matrix.MatrixMessageEvent: when a message is received.
platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room.
platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room.
platypush.message.event.matrix.MatrixRoomInviteEvent: when a user (other than the currently logged one) is invited to a room.
platypush.message.event.matrix.MatrixRoomMeInviteEvent: when the currently logged in user is invited to a room.
platypush.message.event.matrix.MatrixRoomTopicChangeEvent: when the topic/title of a room changes.
package: platypush.plugins.matrix
type: plugin

View file

@ -121,9 +121,7 @@ class NtfyPlugin(RunnablePlugin):
def main(self): def main(self):
if self._subscriptions: if self._subscriptions:
self._connect() self._connect()
self.wait_stop()
while not self._should_stop.is_set():
self._should_stop.wait(timeout=1)
def stop(self): def stop(self):
if self._ws_proc: if self._ws_proc:

View file

@ -21,7 +21,15 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
return value.strip() return value.strip()
class DateTime(fields.Function): # lgtm [py/missing-call-to-init] class Function(fields.Function): # lgtm [py/missing-call-to-init]
def _get_attr(self, obj, attr: str):
if hasattr(obj, attr):
return getattr(obj, attr)
elif hasattr(obj, 'get'):
return obj.get(attr)
class DateTime(Function): # lgtm [py/missing-call-to-init]
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.metadata = { self.metadata = {
@ -30,7 +38,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init]
} }
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
value = normalize_datetime(obj.get(attr)) value = normalize_datetime(self._get_attr(obj, attr))
if value: if value:
return value.isoformat() return value.isoformat()
@ -38,7 +46,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init]
return normalize_datetime(value) return normalize_datetime(value)
class Date(fields.Function): # lgtm [py/missing-call-to-init] class Date(Function): # lgtm [py/missing-call-to-init]
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.metadata = { self.metadata = {
@ -47,7 +55,7 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init]
} }
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
value = normalize_datetime(obj.get(attr)) value = normalize_datetime(self._get_attr(obj, attr))
if value: if value:
return date(value.year, value.month, value.day).isoformat() return date(value.year, value.month, value.day).isoformat()
@ -56,10 +64,12 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init]
return date.fromtimestamp(dt.timestamp()) return date.fromtimestamp(dt.timestamp())
def normalize_datetime(dt: Union[str, date, datetime]) -> Optional[Union[date, datetime]]: def normalize_datetime(
dt: Optional[Union[str, date, datetime]]
) -> Optional[Union[date, datetime]]:
if not dt: if not dt:
return return
if isinstance(dt, datetime) or isinstance(dt, date): if isinstance(dt, (datetime, date)):
return dt return dt
try: try: