forked from platypush/platypush
Compare commits
6 commits
master
...
matrix-int
Author | SHA1 | Date | |
---|---|---|---|
55671f4aff | |||
c32142c8b5 | |||
32be4df11c | |||
3edb8352b4 | |||
cc29136db7 | |||
719bd4fddf |
8 changed files with 736 additions and 68 deletions
|
@ -213,11 +213,10 @@ class Config:
|
|||
config['scripts_dir'] = os.path.abspath(
|
||||
os.path.expanduser(file_config[section])
|
||||
)
|
||||
elif (
|
||||
'disabled' not in file_config[section]
|
||||
or file_config[section]['disabled'] is False
|
||||
):
|
||||
config[section] = file_config[section]
|
||||
else:
|
||||
section_config = file_config.get(section, {}) or {}
|
||||
if not section_config.get('disabled'):
|
||||
config[section] = section_config
|
||||
|
||||
return config
|
||||
|
||||
|
|
103
platypush/message/event/matrix.py
Normal file
103
platypush/message/event/matrix.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
from abc import ABC
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any
|
||||
|
||||
from platypush.message.event import Event
|
||||
|
||||
|
||||
class MatrixEvent(Event, ABC):
|
||||
"""
|
||||
Base matrix event.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
server_url: str,
|
||||
sender_id: str | None = None,
|
||||
sender_display_name: str | None = None,
|
||||
sender_avatar_url: str | None = None,
|
||||
room_id: str | None = None,
|
||||
room_name: str | None = None,
|
||||
room_topic: str | None = None,
|
||||
server_timestamp: datetime | None = None,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
:param server_url: Base server URL.
|
||||
:param sender_id: The event's sender ID.
|
||||
:param sender_display_name: The event's sender display name.
|
||||
:param sender_avatar_url: The event's sender avatar URL.
|
||||
:param room_id: Event room ID.
|
||||
:param room_name: The name of the room associated to the event.
|
||||
:param room_topic: The topic of the room associated to the event.
|
||||
:param server_timestamp: The server timestamp of the event.
|
||||
"""
|
||||
evt_args: Dict[str, Any] = {
|
||||
'server_url': server_url,
|
||||
}
|
||||
|
||||
if sender_id:
|
||||
evt_args['sender_id'] = sender_id
|
||||
if sender_display_name:
|
||||
evt_args['sender_display_name'] = sender_display_name
|
||||
if sender_avatar_url:
|
||||
evt_args['sender_avatar_url'] = sender_avatar_url
|
||||
if room_id:
|
||||
evt_args['room_id'] = room_id
|
||||
if room_name:
|
||||
evt_args['room_name'] = room_name
|
||||
if room_topic:
|
||||
evt_args['room_topic'] = room_topic
|
||||
if server_timestamp:
|
||||
evt_args['server_timestamp'] = server_timestamp
|
||||
|
||||
super().__init__(*args, **evt_args, **kwargs)
|
||||
|
||||
|
||||
class MatrixMessageEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when a message is received on a subscribed room.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, body: str, **kwargs):
|
||||
"""
|
||||
:param body: The body of the message.
|
||||
"""
|
||||
super().__init__(*args, body=body, **kwargs)
|
||||
|
||||
|
||||
class MatrixRoomJoinEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when a user joins a room.
|
||||
"""
|
||||
|
||||
|
||||
class MatrixRoomLeaveEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when a user leaves a room.
|
||||
"""
|
||||
|
||||
|
||||
class MatrixRoomInviteEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when a user is invited to a room.
|
||||
"""
|
||||
|
||||
|
||||
class MatrixRoomInviteMeEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when the currently logged in user is invited to a room.
|
||||
"""
|
||||
|
||||
|
||||
class MatrixRoomTopicChangeEvent(MatrixEvent):
|
||||
"""
|
||||
Event triggered when the topic/title of a room changes.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, topic: str, **kwargs):
|
||||
"""
|
||||
:param topic: New room topic.
|
||||
"""
|
||||
super().__init__(*args, topic=topic, **kwargs)
|
|
@ -12,17 +12,30 @@ from platypush.config import Config
|
|||
from platypush.context import get_plugin
|
||||
from platypush.message import Message
|
||||
from platypush.message.response import Response
|
||||
from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \
|
||||
is_functional_procedure
|
||||
from platypush.utils import (
|
||||
get_hash,
|
||||
get_module_and_method_from_action,
|
||||
get_redis_queue_name_by_message,
|
||||
is_functional_procedure,
|
||||
)
|
||||
|
||||
logger = logging.getLogger('platypush')
|
||||
|
||||
|
||||
class Request(Message):
|
||||
""" Request message class """
|
||||
"""Request message class"""
|
||||
|
||||
def __init__(self, target, action, origin=None, id=None, backend=None,
|
||||
args=None, token=None, timestamp=None):
|
||||
def __init__(
|
||||
self,
|
||||
target,
|
||||
action,
|
||||
origin=None,
|
||||
id=None,
|
||||
backend=None,
|
||||
args=None,
|
||||
token=None,
|
||||
timestamp=None,
|
||||
):
|
||||
"""
|
||||
Params:
|
||||
target -- Target node [Str]
|
||||
|
@ -48,9 +61,13 @@ class Request(Message):
|
|||
@classmethod
|
||||
def build(cls, msg):
|
||||
msg = super().parse(msg)
|
||||
args = {'target': msg.get('target', Config.get('device_id')), 'action': msg['action'],
|
||||
'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(),
|
||||
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()}
|
||||
args = {
|
||||
'target': msg.get('target', Config.get('device_id')),
|
||||
'action': msg['action'],
|
||||
'args': msg.get('args', {}),
|
||||
'id': msg['id'] if 'id' in msg else cls._generate_id(),
|
||||
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(),
|
||||
}
|
||||
|
||||
if 'origin' in msg:
|
||||
args['origin'] = msg['origin']
|
||||
|
@ -61,7 +78,7 @@ class Request(Message):
|
|||
@staticmethod
|
||||
def _generate_id():
|
||||
_id = ''
|
||||
for i in range(0, 16):
|
||||
for _ in range(0, 16):
|
||||
_id += '%.2x' % random.randint(0, 255)
|
||||
return _id
|
||||
|
||||
|
@ -84,9 +101,14 @@ class Request(Message):
|
|||
|
||||
return proc_config(*args, **kwargs)
|
||||
|
||||
proc = Procedure.build(name=proc_name, requests=proc_config['actions'],
|
||||
_async=proc_config['_async'], args=self.args,
|
||||
backend=self.backend, id=self.id)
|
||||
proc = Procedure.build(
|
||||
name=proc_name,
|
||||
requests=proc_config['actions'],
|
||||
_async=proc_config['_async'],
|
||||
args=self.args,
|
||||
backend=self.backend,
|
||||
id=self.id,
|
||||
)
|
||||
|
||||
return proc.execute(*args, **kwargs)
|
||||
|
||||
|
@ -112,7 +134,7 @@ class Request(Message):
|
|||
|
||||
if isinstance(value, str):
|
||||
value = self.expand_value_from_context(value, **context)
|
||||
elif isinstance(value, dict) or isinstance(value, list):
|
||||
elif isinstance(value, (dict, list)):
|
||||
self._expand_context(event_args=value, **context)
|
||||
|
||||
event_args[key] = value
|
||||
|
@ -132,7 +154,11 @@ class Request(Message):
|
|||
try:
|
||||
exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v)))
|
||||
except Exception as e:
|
||||
logger.debug('Could not set context variable {}={}: {}'.format(k, v, str(e)))
|
||||
logger.debug(
|
||||
'Could not set context variable {}={}: {}'.format(
|
||||
k, v, str(e)
|
||||
)
|
||||
)
|
||||
logger.debug('Context: {}'.format(context))
|
||||
|
||||
parsed_value = ''
|
||||
|
@ -152,7 +178,7 @@ class Request(Message):
|
|||
|
||||
if callable(context_value):
|
||||
context_value = context_value()
|
||||
if isinstance(context_value, range) or isinstance(context_value, tuple):
|
||||
if isinstance(context_value, (range, tuple)):
|
||||
context_value = [*context_value]
|
||||
if isinstance(context_value, datetime.date):
|
||||
context_value = context_value.isoformat()
|
||||
|
@ -162,7 +188,7 @@ class Request(Message):
|
|||
|
||||
parsed_value += prefix + (
|
||||
json.dumps(context_value)
|
||||
if isinstance(context_value, list) or isinstance(context_value, dict)
|
||||
if isinstance(context_value, (list, dict))
|
||||
else str(context_value)
|
||||
)
|
||||
else:
|
||||
|
@ -205,6 +231,9 @@ class Request(Message):
|
|||
"""
|
||||
|
||||
def _thread_func(_n_tries, errors=None):
|
||||
from platypush.context import get_bus
|
||||
from platypush.plugins import RunnablePlugin
|
||||
|
||||
response = None
|
||||
|
||||
try:
|
||||
|
@ -221,11 +250,15 @@ class Request(Message):
|
|||
return response
|
||||
else:
|
||||
action = self.expand_value_from_context(self.action, **context)
|
||||
(module_name, method_name) = get_module_and_method_from_action(action)
|
||||
(module_name, method_name) = get_module_and_method_from_action(
|
||||
action
|
||||
)
|
||||
plugin = get_plugin(module_name)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(self.action, str(e))
|
||||
msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(
|
||||
self.action, str(e)
|
||||
)
|
||||
logger.warning(msg)
|
||||
response = Response(output=None, errors=[msg])
|
||||
self._send_response(response)
|
||||
|
@ -243,24 +276,37 @@ class Request(Message):
|
|||
response = plugin.run(method_name, args)
|
||||
|
||||
if not response:
|
||||
logger.warning('Received null response from action {}'.format(action))
|
||||
logger.warning(
|
||||
'Received null response from action {}'.format(action)
|
||||
)
|
||||
else:
|
||||
if response.is_error():
|
||||
logger.warning(('Response processed with errors from ' +
|
||||
'action {}: {}').format(
|
||||
action, str(response)))
|
||||
logger.warning(
|
||||
(
|
||||
'Response processed with errors from ' + 'action {}: {}'
|
||||
).format(action, str(response))
|
||||
)
|
||||
elif not response.disable_logging:
|
||||
logger.info('Processed response from action {}: {}'.
|
||||
format(action, str(response)))
|
||||
logger.info(
|
||||
'Processed response from action {}: {}'.format(
|
||||
action, str(response)
|
||||
)
|
||||
)
|
||||
except (AssertionError, TimeoutError) as e:
|
||||
plugin.logger.exception(e)
|
||||
logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e)))
|
||||
logger.warning(
|
||||
'{} from action [{}]: {}'.format(type(e), action, str(e))
|
||||
)
|
||||
response = Response(output=None, errors=[str(e)])
|
||||
except Exception as e:
|
||||
# Retry mechanism
|
||||
plugin.logger.exception(e)
|
||||
logger.warning(('Uncaught exception while processing response ' +
|
||||
'from action [{}]: {}').format(action, str(e)))
|
||||
logger.warning(
|
||||
(
|
||||
'Uncaught exception while processing response '
|
||||
+ 'from action [{}]: {}'
|
||||
).format(action, str(e))
|
||||
)
|
||||
|
||||
errors = errors or []
|
||||
if str(e) not in errors:
|
||||
|
@ -269,17 +315,21 @@ class Request(Message):
|
|||
response = Response(output=None, errors=errors)
|
||||
if _n_tries - 1 > 0:
|
||||
logger.info('Reloading plugin {} and retrying'.format(module_name))
|
||||
get_plugin(module_name, reload=True)
|
||||
response = _thread_func(_n_tries=_n_tries-1, errors=errors)
|
||||
plugin = get_plugin(module_name, reload=True)
|
||||
if isinstance(plugin, RunnablePlugin):
|
||||
plugin.bus = get_bus()
|
||||
plugin.start()
|
||||
|
||||
response = _thread_func(_n_tries=_n_tries - 1, errors=errors)
|
||||
finally:
|
||||
self._send_response(response)
|
||||
return response
|
||||
|
||||
token_hash = Config.get('token_hash')
|
||||
return response
|
||||
|
||||
if token_hash:
|
||||
if self.token is None or get_hash(self.token) != token_hash:
|
||||
raise PermissionError()
|
||||
stored_token_hash = Config.get('token_hash')
|
||||
token = getattr(self, 'token', '')
|
||||
if stored_token_hash and get_hash(token) != stored_token_hash:
|
||||
raise PermissionError()
|
||||
|
||||
if _async:
|
||||
Thread(target=_thread_func, args=(n_tries,)).start()
|
||||
|
@ -292,15 +342,18 @@ class Request(Message):
|
|||
the message into a UTF-8 JSON string
|
||||
"""
|
||||
|
||||
return json.dumps({
|
||||
'type': 'request',
|
||||
'target': self.target,
|
||||
'action': self.action,
|
||||
'args': self.args,
|
||||
'origin': self.origin if hasattr(self, 'origin') else None,
|
||||
'id': self.id if hasattr(self, 'id') else None,
|
||||
'token': self.token if hasattr(self, 'token') else None,
|
||||
'_timestamp': self.timestamp,
|
||||
})
|
||||
return json.dumps(
|
||||
{
|
||||
'type': 'request',
|
||||
'target': self.target,
|
||||
'action': self.action,
|
||||
'args': self.args,
|
||||
'origin': self.origin if hasattr(self, 'origin') else None,
|
||||
'id': self.id if hasattr(self, 'id') else None,
|
||||
'token': self.token if hasattr(self, 'token') else None,
|
||||
'_timestamp': self.timestamp,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
|
@ -19,12 +19,12 @@ def action(f):
|
|||
result = f(*args, **kwargs)
|
||||
|
||||
if result and isinstance(result, Response):
|
||||
result.errors = result.errors \
|
||||
if isinstance(result.errors, list) else [result.errors]
|
||||
result.errors = (
|
||||
result.errors if isinstance(result.errors, list) else [result.errors]
|
||||
)
|
||||
response = result
|
||||
elif isinstance(result, tuple) and len(result) == 2:
|
||||
response.errors = result[1] \
|
||||
if isinstance(result[1], list) else [result[1]]
|
||||
response.errors = result[1] if isinstance(result[1], list) else [result[1]]
|
||||
|
||||
if len(response.errors) == 1 and response.errors[0] is None:
|
||||
response.errors = []
|
||||
|
@ -39,12 +39,14 @@ def action(f):
|
|||
return _execute_action
|
||||
|
||||
|
||||
class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init]
|
||||
""" Base plugin class """
|
||||
class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init]
|
||||
"""Base plugin class"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__()
|
||||
self.logger = logging.getLogger('platypush:plugin:' + get_plugin_name_by_class(self.__class__))
|
||||
self.logger = logging.getLogger(
|
||||
'platypush:plugin:' + get_plugin_name_by_class(self.__class__)
|
||||
)
|
||||
if 'logging' in kwargs:
|
||||
self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
|
||||
|
||||
|
@ -53,8 +55,9 @@ class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-t
|
|||
)
|
||||
|
||||
def run(self, method, *args, **kwargs):
|
||||
assert method in self.registered_actions, '{} is not a registered action on {}'.\
|
||||
format(method, self.__class__.__name__)
|
||||
assert (
|
||||
method in self.registered_actions
|
||||
), '{} is not a registered action on {}'.format(method, self.__class__.__name__)
|
||||
return getattr(self, method)(*args, **kwargs)
|
||||
|
||||
|
||||
|
@ -62,6 +65,7 @@ class RunnablePlugin(Plugin):
|
|||
"""
|
||||
Class for runnable plugins - i.e. plugins that have a start/stop method and can be started.
|
||||
"""
|
||||
|
||||
def __init__(self, poll_interval: Optional[float] = None, **kwargs):
|
||||
"""
|
||||
:param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval).
|
||||
|
@ -78,6 +82,9 @@ class RunnablePlugin(Plugin):
|
|||
def should_stop(self):
|
||||
return self._should_stop.is_set()
|
||||
|
||||
def wait_stop(self, timeout=None):
|
||||
return self._should_stop.wait(timeout=timeout)
|
||||
|
||||
def start(self):
|
||||
set_thread_name(self.__class__.__name__)
|
||||
self._thread = threading.Thread(target=self._runner)
|
||||
|
|
488
platypush/plugins/matrix/__init__.py
Normal file
488
platypush/plugins/matrix/__init__.py
Normal file
|
@ -0,0 +1,488 @@
|
|||
import datetime
|
||||
import json
|
||||
import multiprocessing
|
||||
import os
|
||||
import pathlib
|
||||
import requests
|
||||
from abc import ABC, abstractmethod
|
||||
from urllib.parse import urljoin
|
||||
from typing import Optional, Collection, Dict, Tuple, Any
|
||||
|
||||
from platypush.config import Config
|
||||
from platypush.context import get_bus
|
||||
from platypush.message.event.matrix import (
|
||||
MatrixEvent,
|
||||
MatrixRoomTopicChangeEvent,
|
||||
MatrixMessageEvent,
|
||||
MatrixRoomJoinEvent,
|
||||
MatrixRoomLeaveEvent,
|
||||
MatrixRoomInviteEvent,
|
||||
MatrixRoomInviteMeEvent,
|
||||
)
|
||||
|
||||
from platypush.plugins import RunnablePlugin, action
|
||||
|
||||
|
||||
class RetrieveWorker(ABC):
|
||||
def __init__(self, server_url: str, access_token: str):
|
||||
self._server_url = server_url
|
||||
self._access_token = access_token
|
||||
|
||||
@abstractmethod
|
||||
def _url(self, id: int | str) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def _process_response(self, rs: dict | Collection[dict]) -> dict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def __call__(self, id: str) -> Tuple[str, dict]:
|
||||
url = urljoin(self._server_url, self._url(id))
|
||||
rs = requests.get(
|
||||
url,
|
||||
headers={
|
||||
'Authorization': f'Bearer {self._access_token}',
|
||||
},
|
||||
)
|
||||
|
||||
rs.raise_for_status()
|
||||
return (id, self._process_response(rs.json()))
|
||||
|
||||
|
||||
class UserRetrieveWorker(RetrieveWorker):
|
||||
def _url(self, id: str) -> str:
|
||||
return f'/_matrix/client/r0/profile/{id}'
|
||||
|
||||
def _process_response(self, rs: dict) -> dict:
|
||||
return {
|
||||
'display_name': rs.get('displayname'),
|
||||
'avatar_url': rs.get('avatar_url'),
|
||||
}
|
||||
|
||||
|
||||
class RoomRetrieveWorker(RetrieveWorker):
|
||||
def _url(self, id: str) -> str:
|
||||
return f'/_matrix/client/v3/rooms/{id}/state'
|
||||
|
||||
def _process_response(self, rs: Collection[dict]) -> dict:
|
||||
info = {}
|
||||
for event in rs:
|
||||
event_type = event.get('type')
|
||||
if event_type == 'm.room.name':
|
||||
info['name'] = event.get('content', {}).get('name')
|
||||
elif event_type == 'm.room.topic':
|
||||
info['name'] = event.get('content', {}).get('topic')
|
||||
|
||||
return info
|
||||
|
||||
|
||||
class MatrixPlugin(RunnablePlugin):
|
||||
"""
|
||||
Matrix chat integration.
|
||||
|
||||
Triggers:
|
||||
|
||||
* :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received.
|
||||
* :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room.
|
||||
* :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room.
|
||||
* :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when a user (other than the
|
||||
currently logged one) is invited to a room.
|
||||
* :class:`platypush.message.event.matrix.MatrixRoomMeInviteEvent`: when the currently logged in
|
||||
user is invited to a room.
|
||||
* :class:`platypush.message.event.matrix.MatrixRoomTopicChangeEvent`: when the topic/title of a room changes.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_url: str = 'https://matrix.to',
|
||||
username: str | None = None,
|
||||
password: str | None = None,
|
||||
access_token: str | None = None,
|
||||
autojoin_on_invite: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Authentication requires either username/password or an access token.
|
||||
|
||||
If you don't want to provide cleartext credentials in the configuration, you can
|
||||
retrieve an access token offline through the following request::
|
||||
|
||||
curl -XPOST '{"type":"m.login.password", "user":"username", "password":"password"}' \
|
||||
"https://matrix.example.com/_matrix/client/r0/login"
|
||||
|
||||
This may be required if the user or the instance enforce 2FA.
|
||||
|
||||
:param server_url: Default Matrix instance base URL (default: ``https://matrix.to``).
|
||||
:param username: Default username. Provide either username/password _or_ an access token.
|
||||
:param password: Default password. Provide either username/password _or_ an access token.
|
||||
:param access_token: Default access token. Provide either username/password _or_ an access token.
|
||||
:param autojoin_on_invite: Whether the account should automatically join rooms
|
||||
upon invite. If false (default value), then you may want to implement your own
|
||||
logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteMeEvent`
|
||||
event is received, and call the :meth:`.join` method if required.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._server_url = server_url
|
||||
self._user_id = None
|
||||
self._autojoin_on_invite = autojoin_on_invite
|
||||
self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore
|
||||
pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self._sessions_file = os.path.join(self._workdir, 'sessions.json')
|
||||
self._credentials_file = os.path.join(self._workdir, 'credentials.json')
|
||||
self._users_cache_file = os.path.join(self._workdir, 'users.json')
|
||||
self._rooms_cache_file = os.path.join(self._workdir, 'rooms.json')
|
||||
self._users_cache: Dict[str, dict] = {}
|
||||
self._rooms_cache: Dict[str, dict] = {}
|
||||
self._set_credentials(username, password, access_token, overwrite=True)
|
||||
|
||||
def _set_credentials(
|
||||
self,
|
||||
username: str | None = None,
|
||||
password: str | None = None,
|
||||
access_token: str | None = None,
|
||||
overwrite: bool = False,
|
||||
):
|
||||
if username or overwrite:
|
||||
self._username = username
|
||||
if password or overwrite:
|
||||
self._password = password
|
||||
if access_token or overwrite:
|
||||
self._access_token = access_token
|
||||
|
||||
def _execute(self, url: str, method: str = 'get', **kwargs):
|
||||
if self._access_token:
|
||||
kwargs['headers'] = {
|
||||
'Authorization': f'Bearer {self._access_token}',
|
||||
**kwargs.get('headers', {}),
|
||||
}
|
||||
|
||||
url = urljoin(self._server_url, f'/_matrix/client/{url.lstrip("/")}')
|
||||
req_method = getattr(requests, method.lower())
|
||||
rs = req_method(url, **kwargs)
|
||||
rs.raise_for_status()
|
||||
rs = rs.json()
|
||||
assert not rs.get('error'), rs.get('error')
|
||||
return rs
|
||||
|
||||
def _save_credentials(self, credentials: dict):
|
||||
with open(self._credentials_file, 'w') as f:
|
||||
json.dump(credentials, f)
|
||||
|
||||
def _refresh_user_id(self):
|
||||
devices = self._execute('/v3/devices').get('devices', [])
|
||||
assert devices, 'The user is not logged into any devices'
|
||||
self._user_id = devices[0]['user_id']
|
||||
|
||||
@action
|
||||
def login(
|
||||
self,
|
||||
server_url: str | None = None,
|
||||
username: str | None = None,
|
||||
password: str | None = None,
|
||||
access_token: str | None = None,
|
||||
):
|
||||
"""
|
||||
Login to an instance if username/password/access_token were not specified in the plugin
|
||||
configuration. Otherwise, change the currently logged user or instance.
|
||||
|
||||
:param server_url: New Matrix instance base URL.
|
||||
:param username: New username.
|
||||
:param password: New user password.
|
||||
:param access_token: New access token.
|
||||
"""
|
||||
self._server_url = server_url or self._server_url
|
||||
self._set_credentials(username, password, access_token, overwrite=False)
|
||||
|
||||
if self._access_token:
|
||||
self._refresh_user_id()
|
||||
elif self._username and self._password:
|
||||
rs = self._execute(
|
||||
'/r0/login',
|
||||
method='post',
|
||||
json={
|
||||
'type': 'm.login.password',
|
||||
'user': self._username,
|
||||
'password': self._password,
|
||||
'initial_device_display_name': 'Platypush Matrix integration',
|
||||
},
|
||||
)
|
||||
|
||||
assert rs.get('access_token'), 'No access token provided by the server'
|
||||
self._access_token = rs['access_token']
|
||||
self._user_id = rs['user_id']
|
||||
self._save_credentials(rs)
|
||||
elif os.path.isfile(self._credentials_file):
|
||||
with open(self._credentials_file, 'r') as f:
|
||||
self._access_token = json.load(f)['access_token']
|
||||
self._refresh_user_id()
|
||||
else:
|
||||
raise AssertionError(
|
||||
'No username, password and access token provided nor stored'
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f'Successfully logged in to {self._server_url} as {self._user_id}'
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _timestamp_to_datetime(t: int | float) -> datetime.datetime:
|
||||
return datetime.datetime.fromtimestamp(t / 1000)
|
||||
|
||||
def _parse_event(
|
||||
self, room_id: str, event: dict, users: dict
|
||||
) -> Optional[MatrixEvent]:
|
||||
evt_type = event.get('type')
|
||||
evt_class = None
|
||||
room_info = self._rooms_cache.get(room_id, {})
|
||||
args: Dict[str, Any] = {
|
||||
'server_url': self._server_url,
|
||||
'room_id': room_id,
|
||||
'room_name': room_info.get('name'),
|
||||
'room_topic': room_info.get('topic'),
|
||||
}
|
||||
|
||||
if event.get('sender') and isinstance(event.get('sender'), str):
|
||||
cached_user = users.get(event['sender'], {})
|
||||
args['sender_id'] = event['sender']
|
||||
args['sender_display_name'] = cached_user.get('display_name')
|
||||
args['sender_avatar_url'] = cached_user.get('avatar_url')
|
||||
|
||||
if event.get('origin_server_ts'):
|
||||
args['server_timestamp'] = self._timestamp_to_datetime(
|
||||
event['origin_server_ts']
|
||||
)
|
||||
|
||||
if evt_type == 'm.room.topic':
|
||||
evt_class = MatrixRoomTopicChangeEvent
|
||||
args['topic'] = event.get('content', {}).get('topic') # type: ignore
|
||||
# TODO Handle encrypted rooms events (`m.room.encrypted`)
|
||||
elif evt_type == 'm.room.message':
|
||||
evt_class = MatrixMessageEvent
|
||||
args['body'] = event.get('content', {}).get('body') # type: ignore
|
||||
elif evt_type == 'm.room.member':
|
||||
membership = event.get('content', {}).get('membership')
|
||||
if membership == 'join':
|
||||
evt_class = MatrixRoomJoinEvent
|
||||
elif membership == 'invite':
|
||||
evt_class = MatrixRoomInviteEvent
|
||||
elif membership == 'leave':
|
||||
evt_class = MatrixRoomLeaveEvent
|
||||
|
||||
if evt_class:
|
||||
return evt_class(**args)
|
||||
|
||||
def _parse_invite_event(
|
||||
self, room_id: str, events: Collection[dict]
|
||||
) -> MatrixRoomInviteMeEvent:
|
||||
evt_args: Dict[str, Any] = {
|
||||
'server_url': self._server_url,
|
||||
'room_id': room_id,
|
||||
}
|
||||
|
||||
for event in events:
|
||||
evt_type = event.get('type')
|
||||
if evt_type == 'm.room.name':
|
||||
evt_args['room_name'] = event.get('content', {}).get('name')
|
||||
elif evt_type == 'm.room.topic':
|
||||
evt_args['room_topic'] = event.get('content', {}).get('topic')
|
||||
if event.get('origin_server_ts'):
|
||||
evt_args['server_timestamp'] = self._timestamp_to_datetime(
|
||||
event['origin_server_ts']
|
||||
)
|
||||
|
||||
if evt_args.get('room_name'):
|
||||
self._rooms_cache[room_id] = {
|
||||
'room_id': room_id,
|
||||
'room_name': evt_args['room_name'],
|
||||
'room_topic': evt_args.get('room_topic'),
|
||||
}
|
||||
|
||||
self._rewrite_rooms_cache()
|
||||
|
||||
return MatrixRoomInviteMeEvent(**evt_args)
|
||||
|
||||
def _retrieve_users_info(self, users: Collection[str]) -> Dict[str, dict]:
|
||||
users_info = {user: {} for user in users}
|
||||
retrieve = UserRetrieveWorker(self._server_url, self._access_token or '')
|
||||
with multiprocessing.Pool(4) as pool:
|
||||
pool_res = pool.map(retrieve, users_info.keys())
|
||||
|
||||
return {
|
||||
user_id: {
|
||||
'user_id': user_id,
|
||||
**info,
|
||||
}
|
||||
for user_id, info in pool_res
|
||||
}
|
||||
|
||||
def _retrieve_rooms_info(self, rooms: Collection[str]) -> Dict[str, dict]:
|
||||
rooms_info = {room: {} for room in rooms}
|
||||
retrieve = RoomRetrieveWorker(self._server_url, self._access_token or '')
|
||||
with multiprocessing.Pool(4) as pool:
|
||||
pool_res = pool.map(retrieve, rooms_info.keys())
|
||||
|
||||
return {
|
||||
room_id: {
|
||||
'room_id': room_id,
|
||||
**info,
|
||||
}
|
||||
for room_id, info in pool_res
|
||||
}
|
||||
|
||||
def _extract_senders(self, rooms) -> Dict[str, dict]:
|
||||
cache_has_changes = False
|
||||
senders = set()
|
||||
|
||||
for room in rooms:
|
||||
room_events = room.get('timeline', {}).get('events', [])
|
||||
for evt in room_events:
|
||||
if evt.get('type') == 'm.room.member':
|
||||
cache_has_changes = True
|
||||
self._users_cache[evt['sender']] = {
|
||||
'user_id': evt['sender'],
|
||||
'display_name': evt.get('content', {}).get('displayname'),
|
||||
'avatar_url': evt.get('content', {}).get('avatar_url'),
|
||||
}
|
||||
|
||||
senders.update({evt['sender'] for evt in room_events if evt.get('sender')})
|
||||
|
||||
missing_senders = {user for user in senders if user not in self._users_cache}
|
||||
|
||||
if missing_senders:
|
||||
cache_has_changes = True
|
||||
self._users_cache.update(self._retrieve_users_info(missing_senders))
|
||||
|
||||
senders_map = {
|
||||
user: self._users_cache.get(user, {'user_id': user}) for user in senders
|
||||
}
|
||||
|
||||
if cache_has_changes:
|
||||
self._rewrite_users_cache()
|
||||
|
||||
return senders_map
|
||||
|
||||
def _extract_rooms(self, rooms: Collection[str]) -> Dict[str, dict]:
|
||||
missing_rooms_info = {
|
||||
room_id for room_id in rooms if not self._rooms_cache.get(room_id)
|
||||
}
|
||||
|
||||
if missing_rooms_info:
|
||||
self._rooms_cache.update(self._retrieve_rooms_info(missing_rooms_info))
|
||||
self._rewrite_rooms_cache()
|
||||
|
||||
return {
|
||||
room_id: self._rooms_cache.get(
|
||||
room_id,
|
||||
{
|
||||
'room_id': room_id,
|
||||
},
|
||||
)
|
||||
for room_id in rooms
|
||||
}
|
||||
|
||||
def _process_events(self, events: dict) -> Collection[MatrixEvent]:
|
||||
rooms = events.get('rooms', {})
|
||||
joined_rooms = rooms.get('join', {})
|
||||
invited_rooms = rooms.get('invite', {})
|
||||
parsed_events = []
|
||||
senders = self._extract_senders(joined_rooms.values())
|
||||
self._extract_rooms(joined_rooms.keys())
|
||||
|
||||
# Create joined rooms events
|
||||
for room_id, room in joined_rooms.items():
|
||||
room_events = room.get('timeline', {}).get('events', [])
|
||||
parsed_room_events = [
|
||||
self._parse_event(room_id=room_id, event=event, users=senders)
|
||||
for event in room_events
|
||||
]
|
||||
|
||||
parsed_events.extend([evt for evt in parsed_room_events if evt])
|
||||
|
||||
# Create invite events
|
||||
for room_id, room in invited_rooms.items():
|
||||
room_events = room.get('invite_state', {}).get('events', [])
|
||||
parsed_room_event = self._parse_invite_event(
|
||||
room_id=room_id, events=room_events
|
||||
)
|
||||
parsed_events.append(parsed_room_event)
|
||||
|
||||
if self._autojoin_on_invite:
|
||||
self.join(room_id)
|
||||
|
||||
parsed_events.sort(key=lambda e: e.server_timestamp)
|
||||
return parsed_events
|
||||
|
||||
def _reload_users_cache(self):
|
||||
if os.path.isfile(self._users_cache_file):
|
||||
with open(self._users_cache_file, 'r') as f:
|
||||
self._users_cache.update(json.load(f))
|
||||
|
||||
def _rewrite_users_cache(self):
|
||||
with open(self._users_cache_file, 'w') as f:
|
||||
json.dump(self._users_cache, f)
|
||||
|
||||
def _reload_rooms_cache(self):
|
||||
if os.path.isfile(self._rooms_cache_file):
|
||||
with open(self._rooms_cache_file, 'r') as f:
|
||||
self._rooms_cache.update(json.load(f))
|
||||
|
||||
def _rewrite_rooms_cache(self):
|
||||
with open(self._rooms_cache_file, 'w') as f:
|
||||
json.dump(self._rooms_cache, f)
|
||||
|
||||
@action
|
||||
def sync(self):
|
||||
"""
|
||||
Sync the state for the currently logged session.
|
||||
"""
|
||||
next_batch = None
|
||||
sessions = {}
|
||||
if os.path.isfile(self._sessions_file):
|
||||
with open(self._sessions_file, 'r') as f:
|
||||
sessions = json.load(f)
|
||||
next_batch = sessions.get(self._user_id, {}).get('next_batch')
|
||||
|
||||
if not next_batch:
|
||||
self.logger.info('Synchronizing Matrix events')
|
||||
|
||||
rs = self._execute('/r0/sync', params={'since': next_batch})
|
||||
events = self._process_events(rs)
|
||||
if events and next_batch:
|
||||
for event in events:
|
||||
get_bus().post(event)
|
||||
|
||||
if not sessions.get(self._user_id):
|
||||
sessions[self._user_id] = {}
|
||||
|
||||
sessions[self._user_id]['next_batch'] = rs.get('next_batch')
|
||||
with open(self._sessions_file, 'w') as f:
|
||||
json.dump(sessions, f)
|
||||
|
||||
if not next_batch:
|
||||
self.logger.info('Matrix events synchronized')
|
||||
|
||||
@action
|
||||
def join(self, room_id: str):
|
||||
"""
|
||||
Join a room by ID.
|
||||
|
||||
:param room_id: Room ID or alias.
|
||||
"""
|
||||
self._execute(f'/v3/join/{room_id}', method='post')
|
||||
self.logger.info('Successfully joined room %s', room_id)
|
||||
|
||||
def main(self):
|
||||
self.login()
|
||||
self._reload_users_cache()
|
||||
self._reload_rooms_cache()
|
||||
|
||||
while not self._should_stop.is_set():
|
||||
try:
|
||||
self.sync()
|
||||
finally:
|
||||
self._should_stop.wait(timeout=10)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
10
platypush/plugins/matrix/manifest.yaml
Normal file
10
platypush/plugins/matrix/manifest.yaml
Normal file
|
@ -0,0 +1,10 @@
|
|||
manifest:
|
||||
events:
|
||||
platypush.message.event.matrix.MatrixMessageEvent: when a message is received.
|
||||
platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room.
|
||||
platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room.
|
||||
platypush.message.event.matrix.MatrixRoomInviteEvent: when a user (other than the currently logged one) is invited to a room.
|
||||
platypush.message.event.matrix.MatrixRoomMeInviteEvent: when the currently logged in user is invited to a room.
|
||||
platypush.message.event.matrix.MatrixRoomTopicChangeEvent: when the topic/title of a room changes.
|
||||
package: platypush.plugins.matrix
|
||||
type: plugin
|
|
@ -121,9 +121,7 @@ class NtfyPlugin(RunnablePlugin):
|
|||
def main(self):
|
||||
if self._subscriptions:
|
||||
self._connect()
|
||||
|
||||
while not self._should_stop.is_set():
|
||||
self._should_stop.wait(timeout=1)
|
||||
self.wait_stop()
|
||||
|
||||
def stop(self):
|
||||
if self._ws_proc:
|
||||
|
|
|
@ -6,7 +6,7 @@ from dateutil.tz import tzutc
|
|||
from marshmallow import fields
|
||||
|
||||
|
||||
class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
|
||||
class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['serialize'] = self._strip
|
||||
kwargs['deserialize'] = self._strip
|
||||
|
@ -21,7 +21,15 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
|
|||
return value.strip()
|
||||
|
||||
|
||||
class DateTime(fields.Function): # lgtm [py/missing-call-to-init]
|
||||
class Function(fields.Function): # lgtm [py/missing-call-to-init]
|
||||
def _get_attr(self, obj, attr: str):
|
||||
if hasattr(obj, attr):
|
||||
return getattr(obj, attr)
|
||||
elif hasattr(obj, 'get'):
|
||||
return obj.get(attr)
|
||||
|
||||
|
||||
class DateTime(Function): # lgtm [py/missing-call-to-init]
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.metadata = {
|
||||
|
@ -30,7 +38,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init]
|
|||
}
|
||||
|
||||
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
|
||||
value = normalize_datetime(obj.get(attr))
|
||||
value = normalize_datetime(self._get_attr(obj, attr))
|
||||
if value:
|
||||
return value.isoformat()
|
||||
|
||||
|
@ -38,7 +46,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init]
|
|||
return normalize_datetime(value)
|
||||
|
||||
|
||||
class Date(fields.Function): # lgtm [py/missing-call-to-init]
|
||||
class Date(Function): # lgtm [py/missing-call-to-init]
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.metadata = {
|
||||
|
@ -47,7 +55,7 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init]
|
|||
}
|
||||
|
||||
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
|
||||
value = normalize_datetime(obj.get(attr))
|
||||
value = normalize_datetime(self._get_attr(obj, attr))
|
||||
if value:
|
||||
return date(value.year, value.month, value.day).isoformat()
|
||||
|
||||
|
@ -56,10 +64,12 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init]
|
|||
return date.fromtimestamp(dt.timestamp())
|
||||
|
||||
|
||||
def normalize_datetime(dt: Union[str, date, datetime]) -> Optional[Union[date, datetime]]:
|
||||
def normalize_datetime(
|
||||
dt: Optional[Union[str, date, datetime]]
|
||||
) -> Optional[Union[date, datetime]]:
|
||||
if not dt:
|
||||
return
|
||||
if isinstance(dt, datetime) or isinstance(dt, date):
|
||||
if isinstance(dt, (datetime, date)):
|
||||
return dt
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in a new issue