import os
from typing import Optional, Union, Callable
from platypush.context import get_bus
from platypush.message.event.ngrok import (
NgrokProcessStartedEvent,
NgrokTunnelStartedEvent,
NgrokTunnelStoppedEvent,
NgrokProcessStoppedEvent,
)
from platypush.plugins import Plugin, action
from platypush.schemas.ngrok import NgrokTunnelSchema
class NgrokPlugin(Plugin):
"""
Plugin to dynamically create and manage network tunnels using `ngrok `_.
"""
def __init__(
self,
auth_token: Optional[str] = None,
ngrok_bin: Optional[str] = None,
region: Optional[str] = None,
**kwargs,
):
"""
:param auth_token: Specify the ``ngrok`` auth token, enabling authenticated features (e.g. more concurrent
tunnels, custom subdomains, etc.).
:param ngrok_bin: By default ``pyngrok`` manages its own version of the ``ngrok`` binary, but you can specify
this option if you want to use a different binary installed on the system.
:param region: ISO code of the region/country that should host the ``ngrok`` tunnel (default: ``us``).
"""
from pyngrok import conf, ngrok
super().__init__(**kwargs)
conf.get_default().log_event_callback = self._get_event_callback()
self._active_tunnels_by_url = {}
if auth_token:
ngrok.set_auth_token(auth_token)
if ngrok_bin:
conf.get_default().ngrok_path = os.path.expanduser(ngrok_bin)
if region:
conf.get_default().region = region
@property
def _active_tunnels_by_name(self) -> dict:
return {
tunnel['name']: tunnel for tunnel in self._active_tunnels_by_url.values()
}
def _get_event_callback(self) -> Callable:
from pyngrok.process import NgrokLog
def callback(log: NgrokLog):
if log.msg == 'client session established':
get_bus().post(NgrokProcessStartedEvent())
elif log.msg == 'started tunnel':
tunnel = {
'name': log.name,
'url': log.url,
'protocol': log.url.split(':')[0],
}
self._active_tunnels_by_url[tunnel['url']] = tunnel
get_bus().post(NgrokTunnelStartedEvent(**tunnel))
elif (
log.msg == 'end'
and int(getattr(log, 'status', 0)) == 204
and getattr(log, 'pg', '').startswith('/api/tunnels')
):
tunnel = log.pg.split('/')[-1]
tunnel = self._active_tunnels_by_name.pop(
tunnel, self._active_tunnels_by_url.pop(tunnel, None)
)
if tunnel:
get_bus().post(NgrokTunnelStoppedEvent(**tunnel))
elif log.msg == 'received stop request':
get_bus().post(NgrokProcessStoppedEvent())
return callback
@action
def create_tunnel(
self,
resource: Union[int, str] = 80,
protocol: str = 'tcp',
name: Optional[str] = None,
auth: Optional[str] = None,
**kwargs,
) -> dict:
"""
Create an ``ngrok`` tunnel to the specified localhost port/protocol.
:param resource: This can be any of the following:
- A TCP or UDP port exposed on localhost.
- A local network address (or ``address:port``) to expose.
- The absolute path (starting with ``file://``) to a local folder - in such case, the specified directory
will be served over HTTP through an ``ngrok`` endpoint (see https://ngrok.com/docs#http-file-urls).
Default: localhost port 80.
:param protocol: Network protocol (default: ``tcp``).
:param name: Optional tunnel name.
:param auth: HTTP basic authentication credentials associated with the tunnel, in the format of
``username:password``.
:param kwargs: Extra arguments supported by the ``ngrok`` tunnel, such as ``hostname``, ``subdomain`` or
``remote_addr`` - see the `ngrok documentation `_ for a full
list.
:return: .. schema:: ngrok.NgrokTunnelSchema
"""
from pyngrok import ngrok
if isinstance(resource, str) and resource.startswith('file://'):
protocol = None
tunnel = ngrok.connect(resource, proto=protocol, name=name, auth=auth, **kwargs)
return NgrokTunnelSchema().dump(tunnel)
@action
def close_tunnel(self, tunnel: str):
"""
Close an ``ngrok`` tunnel.
:param tunnel: Name or public URL of the tunnel to be closed.
"""
from pyngrok import ngrok
if tunnel in self._active_tunnels_by_name:
tunnel = self._active_tunnels_by_name[tunnel]['url']
assert (
tunnel in self._active_tunnels_by_url
), f'No such tunnel URL or name: {tunnel}'
ngrok.disconnect(tunnel)
@action
def get_tunnels(self):
"""
Get the list of active ``ngrok`` tunnels.
:return: .. schema:: ngrok.NgrokTunnelSchema(many=True)
"""
from pyngrok import ngrok
tunnels = ngrok.get_tunnels()
return NgrokTunnelSchema().dump(tunnels, many=True)
@action
def kill_process(self):
"""
The first created tunnel instance also starts the ``ngrok`` process.
The process will stay alive until the Python interpreter is stopped or this action is invoked.
"""
from pyngrok import ngrok
proc = ngrok.get_ngrok_process()
assert proc and proc.proc, 'The ngrok process is not running'
proc.proc.kill()
get_bus().post(NgrokProcessStoppedEvent())
# vim:sw=4:ts=4:et: