300 lines
8.9 KiB
Python
300 lines
8.9 KiB
Python
import asyncio as aio
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
from multiprocessing import Event, Process, Queue
|
|
from typing import Callable, Coroutine, Optional, Set, Type, Union
|
|
|
|
from telegram import Message, Update
|
|
from telegram.ext import (
|
|
Application,
|
|
ApplicationBuilder,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
from platypush.context import get_bus
|
|
from platypush.message.event.telegram import (
|
|
MessageEvent,
|
|
CommandMessageEvent,
|
|
TextMessageEvent,
|
|
PhotoMessageEvent,
|
|
VideoMessageEvent,
|
|
ContactMessageEvent,
|
|
DocumentMessageEvent,
|
|
LocationMessageEvent,
|
|
GroupChatCreatedEvent,
|
|
)
|
|
|
|
from ._bridge import CommandBridge
|
|
from ._model import END_OF_SERVICE, Resource
|
|
from ._utils import dump_msg, dump_user
|
|
|
|
|
|
class TelegramService(Process):
|
|
"""
|
|
Background service to handle Telegram messages and events.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
api_token: str,
|
|
cmd_queue: Queue,
|
|
result_queue: Queue,
|
|
authorized_chat_ids: Set[Union[str, int]],
|
|
*_,
|
|
**__,
|
|
):
|
|
super().__init__(name="telegram-service")
|
|
self.logger = logging.getLogger("platypush:telegram")
|
|
self.authorized_chat_ids = set(authorized_chat_ids or [])
|
|
self._cmd_queue = cmd_queue
|
|
self._result_queue = result_queue
|
|
self._loop = aio.new_event_loop()
|
|
self._app = self._build_app(api_token)
|
|
self._service_running = Event()
|
|
self._service_stopped = Event()
|
|
self._cmd_bridge = CommandBridge(self)
|
|
|
|
def _build_app(self, api_token: str) -> Application:
|
|
app = ApplicationBuilder().token(api_token).build()
|
|
app.add_handler(MessageHandler(filters.ChatType.GROUPS, self._group_hook()))
|
|
|
|
app.add_handler(
|
|
MessageHandler(
|
|
filters.TEXT & (~filters.COMMAND), self._msg_hook(TextMessageEvent)
|
|
)
|
|
)
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.PHOTO, self._msg_hook(PhotoMessageEvent))
|
|
)
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.VIDEO, self._msg_hook(VideoMessageEvent))
|
|
)
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.CONTACT, self._msg_hook(ContactMessageEvent))
|
|
)
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.LOCATION, self._msg_hook(LocationMessageEvent))
|
|
)
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.Document.ALL, self._msg_hook(DocumentMessageEvent))
|
|
)
|
|
|
|
app.add_handler(MessageHandler(filters.COMMAND, self._command_hook()))
|
|
return app
|
|
|
|
async def _authorize(self, msg: Message, context: ContextTypes.DEFAULT_TYPE):
|
|
if not self.authorized_chat_ids:
|
|
return
|
|
|
|
if msg.chat.type == 'private' and msg.chat.id not in self.authorized_chat_ids:
|
|
self.logger.info(
|
|
'Received message from unauthorized chat_id %s', msg.chat.id
|
|
)
|
|
|
|
await context.bot.send_message(
|
|
chat_id=msg.chat.id,
|
|
text='You are not allowed to send messages to this bot',
|
|
)
|
|
|
|
raise PermissionError
|
|
|
|
def _msg_hook(self, cls: Type[MessageEvent]):
|
|
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
msg = update.effective_message
|
|
if not msg:
|
|
return
|
|
|
|
try:
|
|
await self._authorize(msg, context)
|
|
self.bus.post(
|
|
cls(
|
|
chat_id=(
|
|
update.effective_chat.id if update.effective_chat else None
|
|
),
|
|
message=dump_msg(msg) if msg else None,
|
|
user=(
|
|
dump_user(update.effective_user)
|
|
if update.effective_user
|
|
else None
|
|
),
|
|
)
|
|
)
|
|
except PermissionError:
|
|
pass
|
|
|
|
return hook
|
|
|
|
def _group_hook(self):
|
|
async def hook(update: Update, context):
|
|
msg = update.effective_message
|
|
if not msg:
|
|
return
|
|
|
|
if msg.group_chat_created:
|
|
self.bus.post(
|
|
GroupChatCreatedEvent(
|
|
chat_id=(
|
|
update.effective_chat.id if update.effective_chat else None
|
|
),
|
|
message=dump_msg(msg),
|
|
user=(
|
|
dump_user(update.effective_user)
|
|
if update.effective_user
|
|
else None
|
|
),
|
|
)
|
|
)
|
|
elif msg.photo:
|
|
await self._msg_hook(PhotoMessageEvent)(update, context)
|
|
elif msg.video:
|
|
await self._msg_hook(VideoMessageEvent)(update, context)
|
|
elif msg.contact:
|
|
await self._msg_hook(ContactMessageEvent)(update, context)
|
|
elif msg.location:
|
|
await self._msg_hook(LocationMessageEvent)(update, context)
|
|
elif msg.document:
|
|
await self._msg_hook(DocumentMessageEvent)(update, context)
|
|
elif msg.text:
|
|
if msg.text.startswith('/'):
|
|
await self._command_hook()(update, context)
|
|
else:
|
|
await self._msg_hook(TextMessageEvent)(update, context)
|
|
|
|
return hook
|
|
|
|
def _command_hook(self):
|
|
async def hook(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
msg = update.effective_message
|
|
if not (msg and msg.text):
|
|
return
|
|
|
|
m = re.match(r'\s*/([0-9a-zA-Z_-]+)\s*(.*)', msg.text)
|
|
if not m:
|
|
self.logger.warning('Invalid command: %s', msg.text)
|
|
return
|
|
|
|
cmd = m.group(1).lower()
|
|
args = [arg for arg in re.split(r'\s+', m.group(2)) if len(arg)]
|
|
|
|
try:
|
|
await self._authorize(msg, context)
|
|
self.bus.post(
|
|
CommandMessageEvent(
|
|
chat_id=(
|
|
update.effective_chat.id if update.effective_chat else None
|
|
),
|
|
command=cmd,
|
|
cmdargs=args,
|
|
message=dump_msg(msg),
|
|
user=(
|
|
dump_user(update.effective_user)
|
|
if update.effective_user
|
|
else None
|
|
),
|
|
)
|
|
)
|
|
except PermissionError:
|
|
pass
|
|
|
|
return hook
|
|
|
|
def _exec(
|
|
self,
|
|
method: Callable[..., Coroutine],
|
|
*args,
|
|
timeout: Optional[float] = None,
|
|
**kwargs,
|
|
):
|
|
fut = aio.run_coroutine_threadsafe(method(*args, **kwargs), self._loop)
|
|
return fut.result(timeout=timeout)
|
|
|
|
def exec(
|
|
self,
|
|
cmd: str,
|
|
*args,
|
|
timeout: Optional[float] = None,
|
|
resource: Optional[Resource] = None,
|
|
resource_attr: Optional[str] = None,
|
|
**kwargs,
|
|
):
|
|
method = getattr(self._app.bot, cmd, None)
|
|
assert method, f"Method {cmd} not found"
|
|
|
|
if resource:
|
|
assert resource_attr, f"Resource attribute not specified for command {cmd}"
|
|
with resource as file:
|
|
kwargs[resource_attr] = file
|
|
return self._exec(
|
|
method,
|
|
*args,
|
|
timeout=timeout,
|
|
**kwargs,
|
|
)
|
|
|
|
return self._exec(method, *args, timeout=timeout, **kwargs)
|
|
|
|
def _run(self):
|
|
self._app.run_polling()
|
|
|
|
def run(self):
|
|
super().run()
|
|
self._service_running.set()
|
|
self._service_stopped.clear()
|
|
aio.set_event_loop(self._loop)
|
|
self._cmd_bridge.start()
|
|
|
|
try:
|
|
self._run()
|
|
except Exception as e:
|
|
self.logger.error("Telegram polling error: %s", e, exc_info=True)
|
|
finally:
|
|
self._service_running.clear()
|
|
self._service_stopped.set()
|
|
self._cmd_queue.put_nowait(END_OF_SERVICE)
|
|
|
|
def stop(self):
|
|
self._cmd_queue.put_nowait(END_OF_SERVICE)
|
|
self._app.stop_running()
|
|
|
|
if self.is_alive() and self.pid != os.getpid():
|
|
self.terminate()
|
|
self.join(timeout=5)
|
|
|
|
if self.is_alive():
|
|
self.kill()
|
|
|
|
@property
|
|
def bus(self):
|
|
return get_bus()
|
|
|
|
@property
|
|
def cmd_queue(self):
|
|
return self._cmd_queue
|
|
|
|
@property
|
|
def result_queue(self):
|
|
return self._result_queue
|
|
|
|
@property
|
|
def stop_event(self):
|
|
return self._service_stopped
|
|
|
|
def is_running(self):
|
|
return self._service_running.is_set()
|
|
|
|
@property
|
|
def run_event(self):
|
|
return self._service_running
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|