[#349] Refactored/rewritten `telegram` plugin.
continuous-integration/drone/push Build is passing Details

1. `chat.telegram` -> `telegram` plugin.

2. Merged `backend.chat.telegram` logic into `telegram` plugin.

3. Rewritten the architecture of the integration to adapt to the new
   asyncio API introduced in the latest versions of telegram-bot-api.

Closes: #349
This commit is contained in:
Fabio Manganiello 2024-03-03 15:22:37 +01:00
parent 7637890a54
commit 6bdc9e77ee
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
28 changed files with 1852 additions and 1485 deletions

View File

@ -6,7 +6,6 @@ Backends
:maxdepth: 1
:caption: Backends:
platypush/backend/chat.telegram.rst
platypush/backend/http.rst
platypush/backend/midi.rst
platypush/backend/music.mopidy.rst

View File

@ -13,7 +13,6 @@ Events
platypush/events/bluetooth.rst
platypush/events/camera.rst
platypush/events/chat.slack.rst
platypush/events/chat.telegram.rst
platypush/events/clipboard.rst
platypush/events/custom.rst
platypush/events/dbus.rst
@ -66,6 +65,7 @@ Events
platypush/events/sound.rst
platypush/events/stt.rst
platypush/events/sun.rst
platypush/events/telegram.rst
platypush/events/tensorflow.rst
platypush/events/torrent.rst
platypush/events/trello.rst

View File

@ -1,5 +0,0 @@
``chat.telegram``
===================================
.. automodule:: platypush.backend.chat.telegram
:members:

View File

@ -1,5 +0,0 @@
``chat.telegram``
=========================================
.. automodule:: platypush.message.event.chat.telegram
:members:

View File

@ -0,0 +1,5 @@
``telegram``
============
.. automodule:: platypush.message.event.telegram
:members:

View File

@ -1,5 +0,0 @@
``chat.telegram``
===================================
.. automodule:: platypush.plugins.chat.telegram
:members:

View File

@ -0,0 +1,5 @@
``telegram``
============
.. automodule:: platypush.plugins.telegram
:members:

View File

@ -1,5 +0,0 @@
``chat.telegram``
============================================
.. automodule:: platypush.message.response.chat.telegram
:members:

View File

@ -22,7 +22,6 @@ Plugins
platypush/plugins/camera.ir.mlx90640.rst
platypush/plugins/camera.pi.rst
platypush/plugins/camera.pi.legacy.rst
platypush/plugins/chat.telegram.rst
platypush/plugins/clipboard.rst
platypush/plugins/config.rst
platypush/plugins/csv.rst
@ -128,6 +127,7 @@ Plugins
platypush/plugins/switchbot.rst
platypush/plugins/system.rst
platypush/plugins/tcp.rst
platypush/plugins/telegram.rst
platypush/plugins/tensorflow.rst
platypush/plugins/todoist.rst
platypush/plugins/torrent.rst

View File

@ -8,7 +8,6 @@ Responses
platypush/responses/camera.rst
platypush/responses/camera.android.rst
platypush/responses/chat.telegram.rst
platypush/responses/google.drive.rst
platypush/responses/pihole.rst
platypush/responses/printer.cups.rst

View File

@ -1,164 +0,0 @@
import re
from typing import Type, Optional, Union, List
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.chat.telegram import (
MessageEvent,
CommandMessageEvent,
TextMessageEvent,
PhotoMessageEvent,
VideoMessageEvent,
ContactMessageEvent,
DocumentMessageEvent,
LocationMessageEvent,
GroupChatCreatedEvent,
)
from platypush.plugins.chat.telegram import ChatTelegramPlugin
class ChatTelegramBackend(Backend):
"""
Telegram bot that listens for messages and updates.
Requires:
* The :class:`platypush.plugins.chat.telegram.ChatTelegramPlugin` plugin configured
"""
def __init__(
self, authorized_chat_ids: Optional[List[Union[str, int]]] = None, **kwargs
):
"""
:param authorized_chat_ids: Optional list of chat_id/user_id which are authorized to send messages to
the bot. If nothing is specified then no restrictions are applied.
"""
super().__init__(**kwargs)
self.authorized_chat_ids = set(authorized_chat_ids or [])
self._plugin: ChatTelegramPlugin = get_plugin('chat.telegram') # type: ignore
def _authorize(self, msg):
if not self.authorized_chat_ids:
return
if msg.chat.type == 'private' and msg.chat.id not in self.authorized_chat_ids:
self.logger.info(
'Received message from unauthorized chat_id %s', msg.chat.id
)
self._plugin.send_message(
chat_id=msg.chat.id,
text='You are not allowed to send messages to this bot',
)
raise PermissionError
def _msg_hook(self, cls: Type[MessageEvent]):
# noinspection PyUnusedLocal
def hook(update, _):
msg = update.effective_message
try:
self._authorize(msg)
self.bus.post(
cls(
chat_id=update.effective_chat.id,
message=self._plugin.parse_msg(msg).output,
user=self._plugin.parse_user(update.effective_user).output,
)
)
except PermissionError:
pass
return hook
def _group_hook(self):
def hook(update, context):
msg = update.effective_message
if msg.group_chat_created:
self.bus.post(
GroupChatCreatedEvent(
chat_id=update.effective_chat.id,
message=self._plugin.parse_msg(msg).output,
user=self._plugin.parse_user(update.effective_user).output,
)
)
elif msg.photo:
self._msg_hook(PhotoMessageEvent)(update, context)
elif msg.video:
self._msg_hook(VideoMessageEvent)(update, context)
elif msg.contact:
self._msg_hook(ContactMessageEvent)(update, context)
elif msg.location:
self._msg_hook(LocationMessageEvent)(update, context)
elif msg.document:
self._msg_hook(DocumentMessageEvent)(update, context)
elif msg.text:
if msg.text.startswith('/'):
self._command_hook()(update, context)
else:
self._msg_hook(TextMessageEvent)(update, context)
return hook
def _command_hook(self):
def hook(update, _):
msg = update.effective_message
m = re.match(r'\s*/([0-9a-zA-Z_-]+)\s*(.*)', msg.text)
if not m:
self.logger.warning('Invalid command: %s', msg.text)
return
cmd = m.group(1).lower()
args = [arg for arg in re.split(r'\s+', m.group(2)) if len(arg)]
try:
self._authorize(msg)
self.bus.post(
CommandMessageEvent(
chat_id=update.effective_chat.id,
command=cmd,
cmdargs=args,
message=self._plugin.parse_msg(msg).output,
user=self._plugin.parse_user(update.effective_user).output,
)
)
except PermissionError:
pass
return hook
def run(self):
from telegram.ext import MessageHandler, Filters
super().run()
telegram = self._plugin.get_telegram()
dispatcher = telegram.dispatcher
dispatcher.add_handler(MessageHandler(Filters.group, self._group_hook()))
dispatcher.add_handler(
MessageHandler(Filters.text, self._msg_hook(TextMessageEvent))
)
dispatcher.add_handler(
MessageHandler(Filters.photo, self._msg_hook(PhotoMessageEvent))
)
dispatcher.add_handler(
MessageHandler(Filters.video, self._msg_hook(VideoMessageEvent))
)
dispatcher.add_handler(
MessageHandler(Filters.contact, self._msg_hook(ContactMessageEvent))
)
dispatcher.add_handler(
MessageHandler(Filters.location, self._msg_hook(LocationMessageEvent))
)
dispatcher.add_handler(
MessageHandler(Filters.document, self._msg_hook(DocumentMessageEvent))
)
dispatcher.add_handler(MessageHandler(Filters.command, self._command_hook()))
self.logger.info('Initialized Telegram backend')
telegram.start_polling()
# vim:sw=4:ts=4:et:

View File

@ -1,19 +0,0 @@
manifest:
events:
platypush.message.event.chat.telegram.CommandMessageEvent: when a command message
is received.
platypush.message.event.chat.telegram.ContactMessageEvent: when a contact is received.
platypush.message.event.chat.telegram.DocumentMessageEvent: when a document is
received.
platypush.message.event.chat.telegram.GroupChatCreatedEvent: when the bot is invited
to a new group.
platypush.message.event.chat.telegram.LocationMessageEvent: when a location is
received.
platypush.message.event.chat.telegram.PhotoMessageEvent: when a photo is received.
platypush.message.event.chat.telegram.TextMessageEvent: when a text message is
received.
platypush.message.event.chat.telegram.VideoMessageEvent: when a video is received.
install:
pip: []
package: platypush.backend.chat.telegram
type: backend

View File

@ -1,55 +0,0 @@
from typing import List, Optional
from platypush.message.event import Event
class TelegramEvent(Event):
def __init__(self, *args, chat_id: int, **kwargs):
super().__init__(*args, chat_id=chat_id, **kwargs)
class MessageEvent(TelegramEvent):
"""
Event triggered when a new message is received by the Telegram bot.
"""
def __init__(self, *args, message, user, **kwargs):
super().__init__(*args, message=message, user=user, **kwargs)
class CommandMessageEvent(MessageEvent):
"""
Event triggered when a new message is received by the Telegram bot.
"""
def __init__(self, command: str, cmdargs: Optional[List[str]] = None, *args, **kwargs):
super().__init__(*args, command=command, cmdargs=(cmdargs or []), **kwargs)
class TextMessageEvent(MessageEvent):
pass
class PhotoMessageEvent(MessageEvent):
pass
class VideoMessageEvent(MessageEvent):
pass
class ContactMessageEvent(MessageEvent):
pass
class LocationMessageEvent(MessageEvent):
pass
class DocumentMessageEvent(MessageEvent):
pass
class GroupChatCreatedEvent(MessageEvent):
pass
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,87 @@
from typing import List, Optional
from platypush.message.event import Event
class TelegramEvent(Event):
"""
Base class for all the Telegram events.
"""
def __init__(self, *args, chat_id: int, **kwargs):
super().__init__(*args, chat_id=chat_id, **kwargs)
class MessageEvent(TelegramEvent):
"""
Event triggered when a new message is received by the Telegram bot.
"""
def __init__( # pylint: disable=useless-parent-delegation
self, *args, message: Optional[dict], user: Optional[dict], **kwargs
):
"""
:param message: .. schema:: telegram.TelegramMessageSchema
:param user: .. schema:: telegram.TelegramUserSchema
"""
super().__init__(*args, message=message, user=user, **kwargs)
class CommandMessageEvent(MessageEvent):
"""
Event triggered when a new message is received by the Telegram bot.
"""
def __init__( # pylint: disable=useless-parent-delegation
self, *args, command: str, cmdargs: Optional[List[str]] = None, **kwargs
):
"""
:param command: Command name.
:param cmdargs: Command arguments.
"""
super().__init__(*args, command=command, cmdargs=(cmdargs or []), **kwargs)
class TextMessageEvent(MessageEvent):
"""
Event triggered when a new text message is received by the Telegram bot.
"""
class PhotoMessageEvent(MessageEvent):
"""
Event triggered when a new photo message is received by the Telegram bot.
"""
class VideoMessageEvent(MessageEvent):
"""
Event triggered when a new video message is received by the Telegram bot.
"""
class ContactMessageEvent(MessageEvent):
"""
Event triggered when a new contact message is received by the Telegram bot.
"""
class LocationMessageEvent(MessageEvent):
"""
Event triggered when a new location message is received by the Telegram bot.
"""
class DocumentMessageEvent(MessageEvent):
"""
Event triggered when a new document message is received by the Telegram bot.
"""
class GroupChatCreatedEvent(MessageEvent):
"""
Event triggered when a new group chat is created.
"""
# vim:sw=4:ts=4:et:

View File

@ -1,172 +0,0 @@
import datetime
from typing import Optional, List
from platypush.message.response import Response
class TelegramMessageResponse(Response):
def __init__(self,
message_id: int,
chat_id: int,
creation_date: Optional[datetime.datetime],
chat_username: Optional[str] = None,
chat_firstname: Optional[str] = None,
chat_lastname: Optional[str] = None,
from_user_id: Optional[int] = None,
from_username: Optional[str] = None,
from_firstname: Optional[str] = None,
from_lastname: Optional[str] = None,
text: Optional[str] = None,
caption: Optional[str] = None,
edit_date: Optional[datetime.datetime] = None,
forward_date: Optional[datetime.datetime] = None,
forward_from_message_id: Optional[int] = None,
photo_file_id: Optional[str] = None,
photo_file_size: Optional[int] = None,
photo_width: Optional[int] = None,
photo_height: Optional[int] = None,
document_file_id: Optional[str] = None,
document_file_name: Optional[str] = None,
document_file_size: Optional[str] = None,
document_mime_type: Optional[str] = None,
audio_file_id: Optional[str] = None,
audio_file_size: Optional[str] = None,
audio_mime_type: Optional[str] = None,
audio_performer: Optional[str] = None,
audio_title: Optional[str] = None,
audio_duration: Optional[str] = None,
location_latitude: Optional[float] = None,
location_longitude: Optional[float] = None,
contact_phone_number: Optional[str] = None,
contact_first_name: Optional[str] = None,
contact_last_name: Optional[str] = None,
contact_user_id: Optional[int] = None,
contact_vcard: Optional[str] = None,
video_file_id: Optional[str] = None,
video_file_size: Optional[int] = None,
video_width: Optional[int] = None,
video_height: Optional[int] = None,
video_mime_type: Optional[str] = None,
video_duration: Optional[str] = None,
link: Optional[str] = None,
media_group_id: Optional[int] = None,
*args, **kwargs):
super().__init__(*args, output={
'message_id': message_id,
'chat_id': chat_id,
'chat_username': chat_username,
'chat_firstname': chat_firstname,
'chat_lastname': chat_lastname,
'from_user_id': from_user_id,
'from_username': from_username,
'from_firstname': from_firstname,
'from_lastname': from_lastname,
'text': text,
'caption': caption,
'creation_date': creation_date,
'edit_date': edit_date,
'forward_from_message_id': forward_from_message_id,
'forward_date': forward_date,
'photo_file_id': photo_file_id,
'photo_file_size': photo_file_size,
'photo_width': photo_width,
'photo_height': photo_height,
'document_file_id': document_file_id,
'document_file_name': document_file_name,
'document_file_size': document_file_size,
'document_mime_type': document_mime_type,
'audio_file_id': audio_file_id,
'audio_file_size': audio_file_size,
'audio_performer': audio_performer,
'audio_title': audio_title,
'audio_duration': audio_duration,
'audio_mime_type': audio_mime_type,
'video_file_id': video_file_id,
'video_file_size': video_file_size,
'video_width': video_width,
'video_height': video_height,
'video_duration': video_duration,
'video_mime_type': video_mime_type,
'link': link,
'location_latitude': location_latitude,
'location_longitude': location_longitude,
'contact_phone_number': contact_phone_number,
'contact_first_name': contact_first_name,
'contact_last_name': contact_last_name,
'contact_user_id': contact_user_id,
'contact_vcard': contact_vcard,
'media_group_id': media_group_id,
}, **kwargs)
class TelegramFileResponse(Response):
def __init__(self,
file_id: str,
file_path: str,
file_size: int,
*args, **kwargs):
super().__init__(*args, output={
'file_id': file_id,
'file_path': file_path,
'file_size': file_size,
}, **kwargs)
class TelegramChatResponse(Response):
# noinspection PyShadowingBuiltins
def __init__(self,
chat_id: int,
link: str,
username: str,
invite_link: Optional[str],
title: Optional[str] = None,
description: Optional[str] = None,
type: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
*args, **kwargs):
super().__init__(*args, output={
'chat_id': chat_id,
'link': link,
'invite_link': invite_link,
'username': username,
'title': title,
'description': description,
'type': type,
'first_name': first_name,
'last_name': last_name,
}, **kwargs)
class TelegramUserResponse(Response):
# noinspection PyShadowingBuiltins
def __init__(self,
user_id: int,
username: str,
is_bot: bool,
first_name: str,
last_name: Optional[str] = None,
language_code: Optional[str] = None,
link: Optional[str] = None,
*args, **kwargs):
super().__init__(*args, output={
'user_id': user_id,
'username': username,
'is_bot': is_bot,
'link': link,
'language_code': language_code,
'first_name': first_name,
'last_name': last_name,
}, **kwargs)
class TelegramUsersResponse(Response):
# noinspection PyShadowingBuiltins
def __init__(self,
users: List[TelegramUserResponse],
*args, **kwargs):
super().__init__(*args, output=[user.output for user in users], **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -221,7 +221,9 @@ class RunnablePlugin(Plugin):
except Exception as e:
self.logger.warning('Could not join thread on stop: %s', e)
self.logger.info('%s stopped', self.__class__.__name__)
self.logger.info(
'Stopped plugin: [%s]', get_plugin_name_by_class(self.__class__)
)
def _runner(self):
"""
@ -231,7 +233,7 @@ class RunnablePlugin(Plugin):
return
self.logger.info(
'Starting plugin [%s]', get_plugin_name_by_class(self.__class__)
'Starting plugin: [%s]', get_plugin_name_by_class(self.__class__)
)
while not self.should_stop():

View File

@ -5,6 +5,7 @@ class ChatPlugin(Plugin):
"""
Base class for chat plugins.
"""
@action
def send_message(self, *args, **kwargs):
def send_message(self, *_, **__):
raise NotImplementedError()

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- python-telegram-bot
package: platypush.plugins.chat.telegram
type: plugin

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,91 @@
import logging
from threading import Thread
from ._model import Command
class CommandBridge(Thread):
"""
The command bridge is a thread that listens for commands on a command
queue, proxies them to the Telegram service and returns the result back to
the response queue.
This is required because the Telegram service runs in a separate process -
a requirement because of the Telegram bot API constraints, which requires
the asyncio event loop to run in the main thread.
"""
def __init__(self, service, *_, **__):
from ._service import TelegramService
super().__init__(name="telegram-service-bridge")
self.logger = logging.getLogger("platypush:telegram:bridge")
self._service: TelegramService = service
def _exec(self, cmd: Command):
try:
result = self._service.exec(
cmd.cmd, *cmd.args, **cmd.kwargs, timeout=cmd.timeout
)
except Exception as e:
result = e
self._service.result_queue.put_nowait((cmd, result))
def run(self):
super().run()
while self._service.is_running():
try:
cmd = self._service.cmd_queue.get()
except Exception as e:
self.logger.warning("Error while reading command queue: %s", e)
continue
if cmd is None or cmd.is_end_of_service():
break
self._exec(cmd)
class ResultBridge(Thread):
"""
The result bridge is a thread that listens for results on a result queue and
proxies them to the response queue of the Telegram service.
This is required because the Telegram service runs in a separate process -
a requirement because of the Telegram bot API constraints, which requires
the asyncio event loop to run in the main thread.
"""
def __init__(self, plugin, *_, **__):
from . import TelegramPlugin
super().__init__(name="telegram-service-result-bridge")
self.logger = logging.getLogger("platypush:telegram:result-bridge")
self._plugin: TelegramPlugin = plugin
def run(self):
super().run()
while not self._plugin.should_stop():
try:
ret = self._plugin.result_queue.get()
except Exception as e:
self.logger.warning("Error while reading result queue: %s", e)
continue
if not ret:
break
cmd, result = ret
if cmd is None or cmd.is_end_of_service():
break
q = self._plugin.response_queues.get(cmd.id)
if q:
q.put_nowait(result)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,72 @@
import os
from dataclasses import dataclass, field
from typing import Any, Iterable, Optional
from uuid import UUID, uuid4
class Resource:
"""
Class to handle resources (files) to be sent through the Telegram API.
"""
def __init__(
self,
file_id: Optional[int] = None,
url: Optional[str] = None,
path: Optional[str] = None,
):
assert file_id or url or path, 'You need to specify either file_id, url or path'
self.file_id = file_id
self.url = url
self.path = path
self.fd = None
def __enter__(self):
"""
Context manager to open the file and return the file descriptor.
"""
if self.path:
self.fd = open(os.path.abspath(os.path.expanduser(self.path)), 'rb') # noqa
return self.fd
return self.file_id or self.url
def __exit__(self, *_, **__):
"""
If the file was opened, close it.
"""
if self.fd:
self.fd.close()
self.fd = None
@dataclass
class Command:
"""
Dataclass to represent a command to be executed on the Telegram service.
"""
cmd: str
args: Iterable = field(default_factory=list)
kwargs: dict = field(default_factory=dict)
timeout: Optional[float] = None
id: UUID = field(default_factory=uuid4)
def is_end_of_service(self) -> bool:
"""
Check if a command is the end-of-service command.
"""
return is_end_of_service(self)
def is_end_of_service(cmd: Any) -> bool:
"""
Check if a command is the end-of-service command.
"""
return isinstance(cmd, Command) and cmd.cmd == END_OF_SERVICE.cmd
END_OF_SERVICE = Command('stop')
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,299 @@
import asyncio as aio
import logging
import os
import re
from multiprocessing import Event, Process, Queue
from typing import Callable, Coroutine, Optional, Set, Type, Union
from telegram import Message, Update
from telegram.ext import (
Application,
ApplicationBuilder,
ContextTypes,
MessageHandler,
filters,
)
from platypush.context import get_bus
from platypush.message.event.telegram import (
MessageEvent,
CommandMessageEvent,
TextMessageEvent,
PhotoMessageEvent,
VideoMessageEvent,
ContactMessageEvent,
DocumentMessageEvent,
LocationMessageEvent,
GroupChatCreatedEvent,
)
from ._bridge import CommandBridge
from ._model import END_OF_SERVICE, Resource
from ._utils import dump_msg, dump_user
class TelegramService(Process):
"""
Background service to handle Telegram messages and events.
"""
def __init__(
self,
api_token: str,
cmd_queue: Queue,
result_queue: Queue,
authorized_chat_ids: Set[Union[str, int]],
*_,
**__,
):
super().__init__(name="telegram-service")
self.logger = logging.getLogger("platypush:telegram")
self.authorized_chat_ids = set(authorized_chat_ids or [])
self._cmd_queue = cmd_queue
self._result_queue = result_queue
self._loop = aio.new_event_loop()
self._app = self._build_app(api_token)
self._service_running = Event()
self._service_stopped = Event()
self._cmd_bridge = CommandBridge(self)
def _build_app(self, api_token: str) -> Application:
app = ApplicationBuilder().token(api_token).build()
app.add_handler(MessageHandler(filters.ChatType.GROUPS, self._group_hook()))
app.add_handler(
MessageHandler(
filters.TEXT & (~filters.COMMAND), self._msg_hook(TextMessageEvent)
)
)
app.add_handler(
MessageHandler(filters.PHOTO, self._msg_hook(PhotoMessageEvent))
)
app.add_handler(
MessageHandler(filters.VIDEO, self._msg_hook(VideoMessageEvent))
)
app.add_handler(
MessageHandler(filters.CONTACT, self._msg_hook(ContactMessageEvent))
)
app.add_handler(
MessageHandler(filters.LOCATION, self._msg_hook(LocationMessageEvent))
)
app.add_handler(
MessageHandler(filters.Document.ALL, self._msg_hook(DocumentMessageEvent))
)
app.add_handler(MessageHandler(filters.COMMAND, self._command_hook()))
return app
async def _authorize(self, msg: Message, context: ContextTypes.DEFAULT_TYPE):
if not self.authorized_chat_ids:
return
if msg.chat.type == 'private' and msg.chat.id not in self.authorized_chat_ids:
self.logger.info(
'Received message from unauthorized chat_id %s', msg.chat.id
)
await context.bot.send_message(
chat_id=msg.chat.id,
text='You are not allowed to send messages to this bot',
)
raise PermissionError
def _msg_hook(self, cls: Type[MessageEvent]):
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.effective_message
if not msg:
return
try:
await self._authorize(msg, context)
self.bus.post(
cls(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
message=dump_msg(msg) if msg else None,
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
except PermissionError:
pass
return hook
def _group_hook(self):
async def hook(update: Update, context):
msg = update.effective_message
if not msg:
return
if msg.group_chat_created:
self.bus.post(
GroupChatCreatedEvent(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
message=dump_msg(msg),
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
elif msg.photo:
await self._msg_hook(PhotoMessageEvent)(update, context)
elif msg.video:
await self._msg_hook(VideoMessageEvent)(update, context)
elif msg.contact:
await self._msg_hook(ContactMessageEvent)(update, context)
elif msg.location:
await self._msg_hook(LocationMessageEvent)(update, context)
elif msg.document:
await self._msg_hook(DocumentMessageEvent)(update, context)
elif msg.text:
if msg.text.startswith('/'):
await self._command_hook()(update, context)
else:
await self._msg_hook(TextMessageEvent)(update, context)
return hook
def _command_hook(self):
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.effective_message
if not (msg and msg.text):
return
m = re.match(r'\s*/([0-9a-zA-Z_-]+)\s*(.*)', msg.text)
if not m:
self.logger.warning('Invalid command: %s', msg.text)
return
cmd = m.group(1).lower()
args = [arg for arg in re.split(r'\s+', m.group(2)) if len(arg)]
try:
await self._authorize(msg, context)
self.bus.post(
CommandMessageEvent(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
command=cmd,
cmdargs=args,
message=dump_msg(msg),
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
except PermissionError:
pass
return hook
def _exec(
self,
method: Callable[..., Coroutine],
*args,
timeout: Optional[float] = None,
**kwargs,
):
fut = aio.run_coroutine_threadsafe(method(*args, **kwargs), self._loop)
return fut.result(timeout=timeout)
def exec(
self,
cmd: str,
*args,
timeout: Optional[float] = None,
resource: Optional[Resource] = None,
resource_attr: Optional[str] = None,
**kwargs,
):
method = getattr(self._app.bot, cmd, None)
assert method, f"Method {cmd} not found"
if resource:
assert resource_attr, f"Resource attribute not specified for command {cmd}"
with resource as file:
kwargs[resource_attr] = file
return self._exec(
method,
*args,
timeout=timeout,
**kwargs,
)
return self._exec(method, *args, timeout=timeout, **kwargs)
def _run(self):
self._app.run_polling()
def run(self):
super().run()
self._service_running.set()
self._service_stopped.clear()
aio.set_event_loop(self._loop)
self._cmd_bridge.start()
try:
self._run()
except Exception as e:
self.logger.error("Telegram polling error: %s", e, exc_info=True)
finally:
self._service_running.clear()
self._service_stopped.set()
self._cmd_queue.put_nowait(END_OF_SERVICE)
def stop(self):
self._cmd_queue.put_nowait(END_OF_SERVICE)
self._app.stop_running()
if self.is_alive() and self.pid != os.getpid():
self.terminate()
self.join(timeout=5)
if self.is_alive():
self.kill()
@property
def bus(self):
return get_bus()
@property
def cmd_queue(self):
return self._cmd_queue
@property
def result_queue(self):
return self._result_queue
@property
def stop_event(self):
return self._service_stopped
def is_running(self):
return self._service_running.is_set()
@property
def run_event(self):
return self._service_running
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,21 @@
import logging
from telegram import Message as TelegramMessage, User as TelegramUser
from platypush.schemas.telegram import (
TelegramMessageSchema,
TelegramUserSchema,
)
log = logging.getLogger(__name__)
def dump_msg(msg: TelegramMessage) -> dict:
return dict(TelegramMessageSchema().dump(msg))
def dump_user(user: TelegramUser) -> dict:
return dict(TelegramUserSchema().dump(user))
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,15 @@
manifest:
events:
- platypush.message.event.telegram.CommandMessageEvent
- platypush.message.event.telegram.ContactMessageEvent
- platypush.message.event.telegram.DocumentMessageEvent
- platypush.message.event.telegram.GroupChatCreatedEvent
- platypush.message.event.telegram.LocationMessageEvent
- platypush.message.event.telegram.PhotoMessageEvent
- platypush.message.event.telegram.TextMessageEvent
- platypush.message.event.telegram.VideoMessageEvent
install:
pip:
- python-telegram-bot
package: platypush.plugins.telegram
type: plugin

View File

@ -0,0 +1,186 @@
from marshmallow import Schema, pre_dump
from marshmallow.fields import Boolean, Integer, Date, Float, String
class TelegramMessageSchema(Schema):
"""
Schema for Telegram messages.
"""
message_id = Integer(required=True)
chat_id = Integer(required=True)
creation_date = Date()
chat_username = String()
chat_firstname = String()
chat_lastname = String()
from_user_id = Integer()
from_username = String()
from_firstname = String()
from_lastname = String()
text = String()
caption = String()
edit_date = Date()
forward_from_message_id = Integer()
forward_date = Date()
photo_file_id = String()
photo_file_size = Integer()
photo_width = Integer()
photo_height = Integer()
document_file_id = String()
document_file_name = String()
document_file_size = Integer()
document_mime_type = String()
audio_file_id = String()
audio_file_size = Integer()
audio_mime_type = String()
audio_performer = String()
audio_title = String()
audio_duration = Integer()
location_latitude = Float()
location_longitude = Float()
contact_phone_number = String()
contact_first_name = String()
contact_last_name = String()
contact_user_id = Integer()
contact_vcard = String()
video_file_id = String()
video_file_size = Integer()
video_width = Integer()
video_height = Integer()
video_mime_type = String()
video_duration = Integer()
link = String()
media_group_id = String()
@pre_dump
def pre_dump(self, msg, **_) -> dict:
ret = {
'message_id': msg.message_id,
'chat_id': msg.chat_id,
'chat_username': msg.chat.username,
'chat_firstname': msg.chat.first_name,
'chat_lastname': msg.chat.last_name,
'text': msg.text,
'caption': msg.caption,
'creation_date': msg.date,
'edit_date': msg.edit_date,
'forward_date': msg.forward_date,
'forward_from_message_id': msg.forward_from_message_id,
'link': msg.link,
'media_group_id': msg.media_group_id,
}
if msg.from_user:
ret.update(
{
'from_user_id': msg.from_user.id,
'from_username': msg.from_user.username,
'from_firstname': msg.from_user.first_name,
'from_lastname': msg.from_user.last_name,
}
)
if msg.photo:
ret.update(
{
'photo_file_id': msg.photo[-1].file_id,
'photo_file_size': msg.photo[-1].file_size,
'photo_width': msg.photo[-1].width,
'photo_height': msg.photo[-1].height,
}
)
if msg.document:
ret.update(
{
'document_file_id': msg.document.file_id,
'document_file_name': msg.document.file_name,
'document_file_size': msg.document.file_size,
'document_mime_type': msg.document.mime_type,
}
)
if msg.audio:
ret.update(
{
'audio_file_id': msg.audio.file_id,
'audio_file_size': msg.audio.file_size,
'audio_mime_type': msg.audio.mime_type,
'audio_performer': msg.audio.performer,
'audio_title': msg.audio.title,
'audio_duration': msg.audio.duration,
}
)
if msg.video:
ret.update(
{
'video_file_id': msg.video.file_id,
'video_file_size': msg.video.file_size,
'video_width': msg.video.width,
'video_height': msg.video.height,
'video_mime_type': msg.video.mime_type,
'video_duration': msg.video.duration,
}
)
if msg.location:
ret.update(
{
'location_latitude': msg.location.latitude,
'location_longitude': msg.location.longitude,
}
)
if msg.contact:
ret.update(
{
'contact_phone_number': msg.contact.phone_number,
'contact_first_name': msg.contact.first_name,
'contact_last_name': msg.contact.last_name,
'contact_user_id': msg.contact.user_id,
'contact_vcard': msg.contact.vcard,
}
)
return ret
class TelegramFileSchema(Schema):
"""
Schema for Telegram files.
"""
file_id = String(required=True)
file_path = String(required=True)
file_size = Integer(required=True)
class TelegramUserSchema(Schema):
"""
Schema for Telegram users.
"""
user_id = Integer(required=True, data_key='id')
username = String(required=True)
is_bot = Boolean(required=True)
first_name = String(required=True)
last_name = String()
language_code = String()
link = String()
class TelegramChatSchema(Schema):
"""
Schema for Telegram chats.
"""
chat_id = Integer(required=True)
link = String(required=True)
username = String(required=True)
invite_link = String()
title = String()
description = String()
type = String()
first_name = String()
last_name = String()