platypush/platypush/plugins/telegram/_service.py

300 lines
8.9 KiB
Python

import asyncio as aio
import logging
import os
import re
from multiprocessing import Event, Process, Queue
from typing import Callable, Coroutine, Optional, Set, Type, Union
from telegram import Message, Update
from telegram.ext import (
Application,
ApplicationBuilder,
ContextTypes,
MessageHandler,
filters,
)
from platypush.context import get_bus
from platypush.message.event.telegram import (
MessageEvent,
CommandMessageEvent,
TextMessageEvent,
PhotoMessageEvent,
VideoMessageEvent,
ContactMessageEvent,
DocumentMessageEvent,
LocationMessageEvent,
GroupChatCreatedEvent,
)
from ._bridge import CommandBridge
from ._model import END_OF_SERVICE, Resource
from ._utils import dump_msg, dump_user
class TelegramService(Process):
"""
Background service to handle Telegram messages and events.
"""
def __init__(
self,
api_token: str,
cmd_queue: Queue,
result_queue: Queue,
authorized_chat_ids: Set[Union[str, int]],
*_,
**__,
):
super().__init__(name="telegram-service")
self.logger = logging.getLogger("platypush:telegram")
self.authorized_chat_ids = set(authorized_chat_ids or [])
self._cmd_queue = cmd_queue
self._result_queue = result_queue
self._loop = aio.new_event_loop()
self._app = self._build_app(api_token)
self._service_running = Event()
self._service_stopped = Event()
self._cmd_bridge = CommandBridge(self)
def _build_app(self, api_token: str) -> Application:
app = ApplicationBuilder().token(api_token).build()
app.add_handler(MessageHandler(filters.ChatType.GROUPS, self._group_hook()))
app.add_handler(
MessageHandler(
filters.TEXT & (~filters.COMMAND), self._msg_hook(TextMessageEvent)
)
)
app.add_handler(
MessageHandler(filters.PHOTO, self._msg_hook(PhotoMessageEvent))
)
app.add_handler(
MessageHandler(filters.VIDEO, self._msg_hook(VideoMessageEvent))
)
app.add_handler(
MessageHandler(filters.CONTACT, self._msg_hook(ContactMessageEvent))
)
app.add_handler(
MessageHandler(filters.LOCATION, self._msg_hook(LocationMessageEvent))
)
app.add_handler(
MessageHandler(filters.Document.ALL, self._msg_hook(DocumentMessageEvent))
)
app.add_handler(MessageHandler(filters.COMMAND, self._command_hook()))
return app
async def _authorize(self, msg: Message, context: ContextTypes.DEFAULT_TYPE):
if not self.authorized_chat_ids:
return
if msg.chat.type == 'private' and msg.chat.id not in self.authorized_chat_ids:
self.logger.info(
'Received message from unauthorized chat_id %s', msg.chat.id
)
await context.bot.send_message(
chat_id=msg.chat.id,
text='You are not allowed to send messages to this bot',
)
raise PermissionError
def _msg_hook(self, cls: Type[MessageEvent]):
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.effective_message
if not msg:
return
try:
await self._authorize(msg, context)
self.bus.post(
cls(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
message=dump_msg(msg) if msg else None,
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
except PermissionError:
pass
return hook
def _group_hook(self):
async def hook(update: Update, context):
msg = update.effective_message
if not msg:
return
if msg.group_chat_created:
self.bus.post(
GroupChatCreatedEvent(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
message=dump_msg(msg),
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
elif msg.photo:
await self._msg_hook(PhotoMessageEvent)(update, context)
elif msg.video:
await self._msg_hook(VideoMessageEvent)(update, context)
elif msg.contact:
await self._msg_hook(ContactMessageEvent)(update, context)
elif msg.location:
await self._msg_hook(LocationMessageEvent)(update, context)
elif msg.document:
await self._msg_hook(DocumentMessageEvent)(update, context)
elif msg.text:
if msg.text.startswith('/'):
await self._command_hook()(update, context)
else:
await self._msg_hook(TextMessageEvent)(update, context)
return hook
def _command_hook(self):
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.effective_message
if not (msg and msg.text):
return
m = re.match(r'\s*/([0-9a-zA-Z_-]+)\s*(.*)', msg.text)
if not m:
self.logger.warning('Invalid command: %s', msg.text)
return
cmd = m.group(1).lower()
args = [arg for arg in re.split(r'\s+', m.group(2)) if len(arg)]
try:
await self._authorize(msg, context)
self.bus.post(
CommandMessageEvent(
chat_id=(
update.effective_chat.id if update.effective_chat else None
),
command=cmd,
cmdargs=args,
message=dump_msg(msg),
user=(
dump_user(update.effective_user)
if update.effective_user
else None
),
)
)
except PermissionError:
pass
return hook
def _exec(
self,
method: Callable[..., Coroutine],
*args,
timeout: Optional[float] = None,
**kwargs,
):
fut = aio.run_coroutine_threadsafe(method(*args, **kwargs), self._loop)
return fut.result(timeout=timeout)
def exec(
self,
cmd: str,
*args,
timeout: Optional[float] = None,
resource: Optional[Resource] = None,
resource_attr: Optional[str] = None,
**kwargs,
):
method = getattr(self._app.bot, cmd, None)
assert method, f"Method {cmd} not found"
if resource:
assert resource_attr, f"Resource attribute not specified for command {cmd}"
with resource as file:
kwargs[resource_attr] = file
return self._exec(
method,
*args,
timeout=timeout,
**kwargs,
)
return self._exec(method, *args, timeout=timeout, **kwargs)
def _run(self):
self._app.run_polling()
def run(self):
super().run()
self._service_running.set()
self._service_stopped.clear()
aio.set_event_loop(self._loop)
self._cmd_bridge.start()
try:
self._run()
except Exception as e:
self.logger.error("Telegram polling error: %s", e, exc_info=True)
finally:
self._service_running.clear()
self._service_stopped.set()
self._cmd_queue.put_nowait(END_OF_SERVICE)
def stop(self):
self._cmd_queue.put_nowait(END_OF_SERVICE)
self._app.stop_running()
if self.is_alive() and self.pid != os.getpid():
self.terminate()
self.join(timeout=5)
if self.is_alive():
self.kill()
@property
def bus(self):
return get_bus()
@property
def cmd_queue(self):
return self._cmd_queue
@property
def result_queue(self):
return self._result_queue
@property
def stop_event(self):
return self._service_stopped
def is_running(self):
return self._service_running.is_set()
@property
def run_event(self):
return self._service_running
# vim:sw=4:ts=4:et: