70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
from typing import List, Optional
|
|
|
|
from platypush.backend import Backend
|
|
from platypush.context import get_plugin
|
|
from platypush.message.event.chat.telegram import MessageEvent, CommandMessageEvent
|
|
from platypush.plugins.chat.telegram import ChatTelegramPlugin
|
|
|
|
|
|
class ChatTelegramBackend(Backend):
|
|
"""
|
|
Telegram bot that listens for messages and updates.
|
|
|
|
Triggers:
|
|
|
|
* :class:`platypush.message.event.chat.telegram.MessageEvent` when a message is received.
|
|
* :class:`platypush.message.event.chat.telegram.CommandMessageEvent` when a command message is received.
|
|
|
|
Requires:
|
|
|
|
* The :class:`platypush.plugins.chat.telegram.ChatTelegramPlugin` plugin configured
|
|
|
|
"""
|
|
|
|
def __init__(self, commands: Optional[List[str]] = None, **kwargs):
|
|
"""
|
|
:param commands: Optional list of commands to be registered on the bot (e.g. 'start', 'stop', 'help' etc.).
|
|
When you send e.g. '/start' to the bot conversation then a
|
|
:class:`platypush.message.event.chat.telegram.NewCommandMessageEvent` will be triggered instead of a
|
|
:class:`platypush.message.event.chat.telegram.NewMessageEvent` event.
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.commands = commands or []
|
|
self._plugin: ChatTelegramPlugin = get_plugin('chat.telegram')
|
|
|
|
def _msg_hook(self):
|
|
# noinspection PyUnusedLocal
|
|
def hook(update, context):
|
|
self.bus.post(MessageEvent(chat_id=update.effective_chat.id,
|
|
message=self._plugin.parse_msg(update.effective_message).output,
|
|
user=self._plugin.parse_user(update.effective_user).output))
|
|
return hook
|
|
|
|
def _command_hook(self, cmd):
|
|
# noinspection PyUnusedLocal
|
|
def hook(update, context):
|
|
self.bus.post(CommandMessageEvent(command=cmd,
|
|
chat_id=update.effective_chat.id,
|
|
message=self._plugin.parse_msg(update.effective_message).output,
|
|
user=self._plugin.parse_user(update.effective_user).output))
|
|
|
|
return hook
|
|
|
|
def run(self):
|
|
# noinspection PyPackageRequirements
|
|
from telegram.ext import CommandHandler, MessageHandler, Filters
|
|
|
|
super().run()
|
|
telegram = self._plugin.get_telegram()
|
|
dispatcher = telegram.dispatcher
|
|
dispatcher.add_handler(MessageHandler(Filters.all, self._msg_hook()))
|
|
|
|
for cmd in self.commands:
|
|
dispatcher.add_handler(CommandHandler(cmd, self._command_hook(cmd)))
|
|
|
|
self.logger.info('Initialized Telegram backend')
|
|
telegram.start_polling()
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|