92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
import logging
|
|
|
|
from threading import Thread
|
|
|
|
from ._model import Command
|
|
|
|
|
|
class CommandBridge(Thread):
|
|
"""
|
|
The command bridge is a thread that listens for commands on a command
|
|
queue, proxies them to the Telegram service and returns the result back to
|
|
the response queue.
|
|
|
|
This is required because the Telegram service runs in a separate process -
|
|
a requirement because of the Telegram bot API constraints, which requires
|
|
the asyncio event loop to run in the main thread.
|
|
"""
|
|
|
|
def __init__(self, service, *_, **__):
|
|
from ._service import TelegramService
|
|
|
|
super().__init__(name="telegram-service-bridge")
|
|
self.logger = logging.getLogger("platypush:telegram:bridge")
|
|
self._service: TelegramService = service
|
|
|
|
def _exec(self, cmd: Command):
|
|
try:
|
|
result = self._service.exec(
|
|
cmd.cmd, *cmd.args, **cmd.kwargs, timeout=cmd.timeout
|
|
)
|
|
except Exception as e:
|
|
result = e
|
|
|
|
self._service.result_queue.put_nowait((cmd, result))
|
|
|
|
def run(self):
|
|
super().run()
|
|
|
|
while self._service.is_running():
|
|
try:
|
|
cmd = self._service.cmd_queue.get()
|
|
except Exception as e:
|
|
self.logger.warning("Error while reading command queue: %s", e)
|
|
continue
|
|
|
|
if cmd is None or cmd.is_end_of_service():
|
|
break
|
|
|
|
self._exec(cmd)
|
|
|
|
|
|
class ResultBridge(Thread):
|
|
"""
|
|
The result bridge is a thread that listens for results on a result queue and
|
|
proxies them to the response queue of the Telegram service.
|
|
|
|
This is required because the Telegram service runs in a separate process -
|
|
a requirement because of the Telegram bot API constraints, which requires
|
|
the asyncio event loop to run in the main thread.
|
|
"""
|
|
|
|
def __init__(self, plugin, *_, **__):
|
|
from . import TelegramPlugin
|
|
|
|
super().__init__(name="telegram-service-result-bridge")
|
|
self.logger = logging.getLogger("platypush:telegram:result-bridge")
|
|
self._plugin: TelegramPlugin = plugin
|
|
|
|
def run(self):
|
|
super().run()
|
|
|
|
while not self._plugin.should_stop():
|
|
try:
|
|
ret = self._plugin.result_queue.get()
|
|
except Exception as e:
|
|
self.logger.warning("Error while reading result queue: %s", e)
|
|
continue
|
|
|
|
if not ret:
|
|
break
|
|
|
|
cmd, result = ret
|
|
if cmd is None or cmd.is_end_of_service():
|
|
break
|
|
|
|
q = self._plugin.response_queues.get(cmd.id)
|
|
if q:
|
|
q.put_nowait(result)
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|