Migrated websocket service.

The websocket service is no longer provided by a different service,
controlled by a different thread running on another port.

Instead, it's now exposed directly over Flask routes, using
WSGI+eventlet+simple_websocket.

Also, the SSL context options have been removed from `backend.http`, for
sake of simplicity. If you want to enable SSL, you can serve Platypush
through a reverse proxy like nginx.
This commit is contained in:
Fabio Manganiello 2023-05-07 12:08:28 +02:00
parent 3aefc9607d
commit f9b0bc905e
Signed by: blacklight
GPG key ID: D90FBA7F76362774
11 changed files with 269 additions and 283 deletions

View file

@ -302,7 +302,7 @@ autodoc_mock_imports = [
'bleak', 'bleak',
'bluetooth_numbers', 'bluetooth_numbers',
'TheengsDecoder', 'TheengsDecoder',
'waitress', 'simple_websocket',
] ]
sys.path.insert(0, os.path.abspath('../..')) sys.path.insert(0, os.path.abspath('../..'))

View file

@ -109,8 +109,6 @@ calendar:
backend.http: backend.http:
# Listening port # Listening port
port: 8008 port: 8008
# Websocket port
websocket_port: 8009
# Through resource_dirs you can specify external folders whose content can be accessed on # Through resource_dirs you can specify external folders whose content can be accessed on
# the web server through a custom URL. In the case below we have a Dropbox folder containing # the web server through a custom URL. In the case below we have a Dropbox folder containing
@ -165,10 +163,6 @@ backend.mqtt:
#backend.tcp: #backend.tcp:
# port: 3333 # port: 3333
# Websocket backend. Install required dependencies through 'pip install "platypush[http]"'
#backend.websocket:
# port: 8765
## -- ## --
## Assistant configuration examples ## Assistant configuration examples
## -- ## --

View file

@ -4,20 +4,15 @@ import secrets
import threading import threading
from multiprocessing import Process, cpu_count from multiprocessing import Process, cpu_count
from typing import Mapping, Optional
try:
from websockets.exceptions import ConnectionClosed # type: ignore
from websockets import serve as websocket_serve # type: ignore
except ImportError:
from websockets import ConnectionClosed, serve as websocket_serve # type: ignore
from platypush.backend import Backend from platypush.backend import Backend
from platypush.backend.http.app import application from platypush.backend.http.app import application
from platypush.backend.http.ws import events_redis_topic
from platypush.backend.http.wsgi import WSGIApplicationWrapper from platypush.backend.http.wsgi import WSGIApplicationWrapper
from platypush.bus.redis import RedisBus from platypush.bus.redis import RedisBus
from platypush.config import Config from platypush.config import Config
from platypush.context import get_or_create_event_loop from platypush.utils import get_redis
from platypush.utils import get_ssl_server_context
class HttpBackend(Backend): class HttpBackend(Backend):
@ -31,8 +26,6 @@ class HttpBackend(Backend):
backend.http: backend.http:
# Default HTTP listen port # Default HTTP listen port
port: 8008 port: 8008
# Default websocket port
websocket_port: 8009
# External folders that will be exposed over `/resources/<name>` # External folders that will be exposed over `/resources/<name>`
resource_dirs: resource_dirs:
photos: /mnt/hd/photos photos: /mnt/hd/photos
@ -158,70 +151,30 @@ class HttpBackend(Backend):
""" """
_DEFAULT_HTTP_PORT = 8008 _DEFAULT_HTTP_PORT = 8008
_DEFAULT_WEBSOCKET_PORT = 8009
def __init__( def __init__(
self, self,
port=_DEFAULT_HTTP_PORT, port: int = _DEFAULT_HTTP_PORT,
websocket_port=_DEFAULT_WEBSOCKET_PORT, bind_address: str = '0.0.0.0',
bind_address='0.0.0.0', resource_dirs: Optional[Mapping[str, str]] = None,
disable_websocket=False, secret_key_file: Optional[str] = None,
resource_dirs=None,
ssl_cert=None,
ssl_key=None,
ssl_cafile=None,
ssl_capath=None,
maps=None,
secret_key_file=None,
**kwargs, **kwargs,
): ):
""" """
:param port: Listen port for the web server (default: 8008) :param port: Listen port for the web server (default: 8008)
:type port: int
:param websocket_port: Listen port for the websocket server (default: 8009)
:type websocket_port: int
:param bind_address: Address/interface to bind to (default: 0.0.0.0, accept connection from any IP) :param bind_address: Address/interface to bind to (default: 0.0.0.0, accept connection from any IP)
:type bind_address: str
:param disable_websocket: Disable the websocket interface (default: False)
:type disable_websocket: bool
:param ssl_cert: Set it to the path of your certificate file if you want to enable HTTPS (default: None)
:type ssl_cert: str
:param ssl_key: Set it to the path of your key file if you want to enable HTTPS (default: None)
:type ssl_key: str
:param ssl_cafile: Set it to the path of your certificate authority file if you want to enable HTTPS
(default: None)
:type ssl_cafile: str
:param ssl_capath: Set it to the path of your certificate authority directory if you want to enable HTTPS
(default: None)
:type ssl_capath: str
:param resource_dirs: Static resources directories that will be :param resource_dirs: Static resources directories that will be
accessible through ``/resources/<path>``. It is expressed as a map accessible through ``/resources/<path>``. It is expressed as a map
where the key is the relative path under ``/resources`` to expose and where the key is the relative path under ``/resources`` to expose and
the value is the absolute path to expose. the value is the absolute path to expose.
:type resource_dirs: dict[str, str]
:param secret_key_file: Path to the file containing the secret key that will be used by Flask :param secret_key_file: Path to the file containing the secret key that will be used by Flask
(default: ``~/.local/share/platypush/flask.secret.key``). (default: ``~/.local/share/platypush/flask.secret.key``).
:type secret_key_file: str
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.port = port self.port = port
self.websocket_port = websocket_port
self.maps = maps or {}
self.server_proc = None self.server_proc = None
self.disable_websocket = disable_websocket
self.websocket_thread = None
self._websocket_loop = None
self._service_registry_thread = None self._service_registry_thread = None
self.bind_address = bind_address self.bind_address = bind_address
@ -233,30 +186,11 @@ class HttpBackend(Backend):
else: else:
self.resource_dirs = {} self.resource_dirs = {}
self.active_websockets = set()
self.ssl_context = (
get_ssl_server_context(
ssl_cert=ssl_cert,
ssl_key=ssl_key,
ssl_cafile=ssl_cafile,
ssl_capath=ssl_capath,
)
if ssl_cert
else None
)
self._workdir: str = Config.get('workdir') # type: ignore
assert self._workdir, 'The workdir is not set'
self.secret_key_file = os.path.expanduser( self.secret_key_file = os.path.expanduser(
secret_key_file secret_key_file
or os.path.join(self._workdir, 'flask.secret.key') # type: ignore or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore
) )
protocol = 'https' if ssl_cert else 'http' self.local_base_url = f'http://localhost:{self.port}'
self.local_base_url = f'{protocol}://localhost:{self.port}'
self._websocket_lock_timeout = 10
self._websocket_lock = threading.RLock()
self._websocket_locks = {}
def send_message(self, *_, **__): def send_message(self, *_, **__):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -278,119 +212,13 @@ class HttpBackend(Backend):
else: else:
self.logger.info('HTTP server process terminated') self.logger.info('HTTP server process terminated')
if (
self.websocket_thread
and self.websocket_thread.is_alive()
and self._websocket_loop
):
self._websocket_loop.stop()
self.logger.info('HTTP websocket service terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive(): if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5) self._service_registry_thread.join(timeout=5)
self._service_registry_thread = None self._service_registry_thread = None
def _acquire_websocket_lock(self, ws):
try:
acquire_ok = self._websocket_lock.acquire(
timeout=self._websocket_lock_timeout
)
if not acquire_ok:
raise TimeoutError('Websocket lock acquire timeout')
addr = ws.remote_address
if addr not in self._websocket_locks:
self._websocket_locks[addr] = threading.RLock()
finally:
self._websocket_lock.release()
acquire_ok = self._websocket_locks[addr].acquire(
timeout=self._websocket_lock_timeout
)
if not acquire_ok:
raise TimeoutError(f'Websocket on address {addr} not ready to receive data')
def _release_websocket_lock(self, ws):
try:
acquire_ok = self._websocket_lock.acquire(
timeout=self._websocket_lock_timeout
)
if not acquire_ok:
raise TimeoutError('Websocket lock acquire timeout')
addr = ws.remote_address
if addr in self._websocket_locks:
self._websocket_locks[addr].release()
except Exception as e:
self.logger.warning(
'Unhandled exception while releasing websocket lock: %s', e
)
finally:
self._websocket_lock.release()
def notify_web_clients(self, event): def notify_web_clients(self, event):
"""Notify all the connected web clients (over websocket) of a new event""" """Notify all the connected web clients (over websocket) of a new event"""
get_redis().publish(events_redis_topic, str(event))
async def send_event(ws):
try:
self._acquire_websocket_lock(ws)
await ws.send(str(event))
except Exception as e:
self.logger.warning('Error on websocket send_event: %s', e)
finally:
self._release_websocket_lock(ws)
loop = get_or_create_event_loop()
wss = self.active_websockets.copy()
for _ws in wss:
try:
loop.run_until_complete(send_event(_ws))
except ConnectionClosed:
self.logger.warning(
'Websocket client %s connection lost', _ws.remote_address
)
self.active_websockets.remove(_ws)
if _ws.remote_address in self._websocket_locks:
del self._websocket_locks[_ws.remote_address]
def websocket(self):
"""Websocket main server"""
async def register_websocket(websocket, path):
address = (
websocket.remote_address
if websocket.remote_address
else '<unknown client>'
)
self.logger.info(
'New websocket connection from %s on path %s', address, path
)
self.active_websockets.add(websocket)
try:
await websocket.recv()
except ConnectionClosed:
self.logger.info('Websocket client %s closed connection', address)
self.active_websockets.remove(websocket)
if address in self._websocket_locks:
del self._websocket_locks[address]
websocket_args = {}
if self.ssl_context:
websocket_args['ssl'] = self.ssl_context
self._websocket_loop = get_or_create_event_loop()
self._websocket_loop.run_until_complete(
websocket_serve(
register_websocket,
self.bind_address,
self.websocket_port,
**websocket_args,
)
)
self._websocket_loop.run_forever()
def _get_secret_key(self, _create=False): def _get_secret_key(self, _create=False):
if _create: if _create:
@ -419,10 +247,13 @@ class HttpBackend(Backend):
), 'The HTTP backend only works if backed by a Redis bus' ), 'The HTTP backend only works if backed by a Redis bus'
application.config['redis_queue'] = self.bus.redis_queue application.config['redis_queue'] = self.bus.redis_queue
application.config['lifespan'] = 'on'
application.secret_key = self._get_secret_key() application.secret_key = self._get_secret_key()
kwargs = { kwargs = {
'bind': f'{self.bind_address}:{self.port}', 'bind': f'{self.bind_address}:{self.port}',
'workers': (cpu_count() * 2) + 1, 'workers': (cpu_count() * 2) + 1,
'worker_class': 'eventlet',
'timeout': 60,
} }
WSGIApplicationWrapper(f'{__package__}.app:application', kwargs).run() WSGIApplicationWrapper(f'{__package__}.app:application', kwargs).run()
@ -436,15 +267,6 @@ class HttpBackend(Backend):
self.logger.warning('Could not register the Zeroconf service') self.logger.warning('Could not register the Zeroconf service')
self.logger.exception(e) self.logger.exception(e)
def _start_websocket_server(self):
if not self.disable_websocket:
self.logger.info('Initializing websocket interface')
self.websocket_thread = threading.Thread(
target=self.websocket,
name='WebsocketServer',
)
self.websocket_thread.start()
def _start_zeroconf_service(self): def _start_zeroconf_service(self):
self._service_registry_thread = threading.Thread( self._service_registry_thread = threading.Thread(
target=self._register_service, target=self._register_service,
@ -460,7 +282,6 @@ class HttpBackend(Backend):
def run(self): def run(self):
super().run() super().run()
self._start_websocket_server()
self._start_zeroconf_service() self._start_zeroconf_service()
self._run_web_server() self._run_web_server()

View file

@ -0,0 +1,65 @@
from logging import getLogger
from flask import Blueprint, request
from simple_websocket import ConnectionClosed, Server
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate
from platypush.backend.http.ws import events_redis_topic
from platypush.message.event import Event
from platypush.utils import get_redis
ws = Blueprint('ws', __name__, template_folder=template_folder)
__routes__ = [ws]
logger = getLogger(__name__)
@ws.route('/ws/events', websocket=True)
@authenticate(json=True)
def ws_events_route():
"""
A websocket endpoint to asynchronously receive events generated from the
application.
This endpoint is mainly used by web clients to listen for the events
generated by the application.
"""
sock = Server(request.environ, ping_interval=25)
ws_key = (sock.environ['REMOTE_ADDR'], int(sock.environ['REMOTE_PORT']))
sub = get_redis().pubsub()
sub.subscribe(events_redis_topic)
logger.info('Started websocket connection with %s', ws_key)
try:
for msg in sub.listen():
if (
msg.get('type') != 'message'
and msg.get('channel').decode() != events_redis_topic
):
continue
try:
evt = Event.build(msg.get('data').decode())
except Exception as e:
logger.warning('Error parsing event: %s: %s', msg.get('data'), e)
continue
sock.send(str(evt))
except ConnectionClosed as e:
logger.info(
'Websocket connection to %s closed, reason=%s, message=%s',
ws_key,
e.reason,
e.message,
)
finally:
sub.unsubscribe(events_redis_topic)
return ''
# vim:sw=4:ts=4:et:

View file

@ -7,12 +7,6 @@ import { bus } from "@/bus";
export default { export default {
name: "Events", name: "Events",
props: {
wsPort: {
type: Number,
default: 8009,
}
},
data() { data() {
return { return {
@ -21,7 +15,9 @@ export default {
pending: false, pending: false,
opened: false, opened: false,
timeout: null, timeout: null,
reconnectMsecs: 30000, reconnectMsecs: 1000,
minReconnectMsecs: 1000,
maxReconnectMsecs: 30000,
handlers: {}, handlers: {},
handlerNameToEventTypes: {}, handlerNameToEventTypes: {},
} }
@ -30,6 +26,7 @@ export default {
methods: { methods: {
onWebsocketTimeout() { onWebsocketTimeout() {
console.log('Websocket reconnection timed out, retrying') console.log('Websocket reconnection timed out, retrying')
this.reconnectMsecs = Math.min(this.reconnectMsecs * 2, this.maxReconnectMsecs)
this.pending = false this.pending = false
if (this.ws) if (this.ws)
this.ws.close() this.ws.close()
@ -88,6 +85,7 @@ export default {
console.log('Websocket connection successful') console.log('Websocket connection successful')
this.opened = true this.opened = true
this.reconnectMsecs = this.minReconnectMsecs
if (this.pending) { if (this.pending) {
this.pending = false this.pending = false
@ -106,7 +104,10 @@ export default {
onClose(event) { onClose(event) {
if (event) { if (event) {
console.log('Websocket closed - code: ' + event.code + ' - reason: ' + event.reason) console.log(
`Websocket closed - code: ${event.code} - reason: ${event.reason}. ` +
`Retrying in ${this.reconnectMsecs / 1000}s`
)
} }
this.opened = false this.opened = false
@ -120,7 +121,7 @@ export default {
init() { init() {
try { try {
const protocol = location.protocol === 'https:' ? 'wss' : 'ws' const protocol = location.protocol === 'https:' ? 'wss' : 'ws'
const url = `${protocol}://${location.hostname}:${this.wsPort}` const url = `${protocol}://${location.host}/ws/events`
this.ws = new WebSocket(url) this.ws = new WebSocket(url)
} catch (err) { } catch (err) {
console.error('Websocket initialization error') console.error('Websocket initialization error')

View file

@ -18,6 +18,11 @@ module.exports = {
target: 'http://localhost:8008', target: 'http://localhost:8008',
changeOrigin: true changeOrigin: true
}, },
'/ws/*': {
target: 'http://localhost:8008',
ws: true,
changeOrigin: true
},
'/auth': { '/auth': {
target: 'http://localhost:8008', target: 'http://localhost:8008',
changeOrigin: true changeOrigin: true

View file

@ -0,0 +1,3 @@
from platypush.config import Config
events_redis_topic = f'__platypush/{Config.get("device_id")}/events' # type: ignore

View file

@ -7,7 +7,7 @@ from platypush.context import get_backend
from platypush.message.event import Event from platypush.message.event import Event
class EventProcessor(object): class EventProcessor:
"""Event processor class. Checks an event against the configured """Event processor class. Checks an event against the configured
rules and executes any matching event hooks""" rules and executes any matching event hooks"""
@ -21,7 +21,7 @@ class EventProcessor(object):
hooks = Config.get_event_hooks() hooks = Config.get_event_hooks()
self.hooks = [] self.hooks = []
for (name, hook) in hooks.items(): for name, hook in hooks.items():
h = EventHook.build(name=name, hook=hook) h = EventHook.build(name=name, hook=hook)
self.hooks.append(h) self.hooks.append(h)
@ -35,10 +35,6 @@ class EventProcessor(object):
if backend: if backend:
backend.notify_web_clients(event) backend.notify_web_clients(event)
backend = get_backend('websocket')
if backend:
backend.notify_web_clients(event)
def process_event(self, event: Event): def process_event(self, event: Event):
"""Processes an event and runs the matched hooks with the highest score""" """Processes an event and runs the matched hooks with the highest score"""

View file

@ -23,8 +23,9 @@ import yaml
from platypush.config import Config from platypush.config import Config
from platypush.utils import manifest from platypush.utils import manifest
workdir = os.path.join(os.path.expanduser('~'), '.local', 'share', workdir = os.path.join(
'platypush', 'platydock') os.path.expanduser('~'), '.local', 'share', 'platypush', 'platydock'
)
class Action(enum.Enum): class Action(enum.Enum):
@ -52,8 +53,12 @@ def _parse_deps(cls):
def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version): def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version):
device_id = Config.get('device_id') device_id = Config.get('device_id')
if not device_id: if not device_id:
raise RuntimeError(('You need to specify a device_id in {} - Docker ' + raise RuntimeError(
'containers cannot rely on hostname').format(cfgfile)) (
'You need to specify a device_id in {} - Docker '
+ 'containers cannot rely on hostname'
).format(cfgfile)
)
os.makedirs(device_dir, exist_ok=True) os.makedirs(device_dir, exist_ok=True)
content = textwrap.dedent( content = textwrap.dedent(
@ -63,7 +68,10 @@ def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version):
RUN mkdir -p /app RUN mkdir -p /app
RUN mkdir -p /etc/platypush RUN mkdir -p /etc/platypush
RUN mkdir -p /usr/local/share/platypush\n RUN mkdir -p /usr/local/share/platypush\n
'''.format(python_version=python_version)).lstrip() '''.format(
python_version=python_version
)
).lstrip()
srcdir = os.path.dirname(cfgfile) srcdir = os.path.dirname(cfgfile)
cfgfile_copy = os.path.join(device_dir, 'config.yaml') cfgfile_copy = os.path.join(device_dir, 'config.yaml')
@ -81,9 +89,15 @@ def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version):
} }
with open(cfgfile_copy, 'a') as f: with open(cfgfile_copy, 'a') as f:
f.write('\n# Automatically added by platydock, do not remove\n' + yaml.dump({ f.write(
'\n# Automatically added by platydock, do not remove\n'
+ yaml.dump(
{
'backend.redis': backend_config['redis'], 'backend.redis': backend_config['redis'],
}) + '\n') }
)
+ '\n'
)
# Main database configuration # Main database configuration
has_main_db = False has_main_db = False
@ -95,11 +109,17 @@ def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version):
if not has_main_db: if not has_main_db:
with open(cfgfile_copy, 'a') as f: with open(cfgfile_copy, 'a') as f:
f.write('\n# Automatically added by platydock, do not remove\n' + yaml.dump({ f.write(
'\n# Automatically added by platydock, do not remove\n'
+ yaml.dump(
{
'main.db': { 'main.db': {
'engine': 'sqlite:////platypush.db', 'engine': 'sqlite:////platypush.db',
} }
}) + '\n') }
)
+ '\n'
)
# Copy included files # Copy included files
# noinspection PyProtectedMember # noinspection PyProtectedMember
@ -109,22 +129,33 @@ def generate_dockerfile(deps, ports, cfgfile, device_dir, python_version):
pathlib.Path(destdir).mkdir(parents=True, exist_ok=True) pathlib.Path(destdir).mkdir(parents=True, exist_ok=True)
shutil.copy(include, destdir, follow_symlinks=True) shutil.copy(include, destdir, follow_symlinks=True)
content += 'RUN mkdir -p /etc/platypush/' + incdir + '\n' content += 'RUN mkdir -p /etc/platypush/' + incdir + '\n'
content += 'COPY ' + os.path.relpath(include, srcdir) + \ content += (
' /etc/platypush/' + incdir + '\n' 'COPY '
+ os.path.relpath(include, srcdir)
+ ' /etc/platypush/'
+ incdir
+ '\n'
)
# Copy script files # Copy script files
scripts_dir = os.path.join(os.path.dirname(cfgfile), 'scripts') scripts_dir = os.path.join(os.path.dirname(cfgfile), 'scripts')
if os.path.isdir(scripts_dir): if os.path.isdir(scripts_dir):
local_scripts_dir = os.path.join(device_dir, 'scripts') local_scripts_dir = os.path.join(device_dir, 'scripts')
remote_scripts_dir = '/etc/platypush/scripts' remote_scripts_dir = '/etc/platypush/scripts'
shutil.copytree(scripts_dir, local_scripts_dir, symlinks=True, dirs_exist_ok=True) shutil.copytree(
scripts_dir, local_scripts_dir, symlinks=True, dirs_exist_ok=True
)
content += f'RUN mkdir -p {remote_scripts_dir}\n' content += f'RUN mkdir -p {remote_scripts_dir}\n'
content += f'COPY scripts/ {remote_scripts_dir}\n' content += f'COPY scripts/ {remote_scripts_dir}\n'
packages = deps.pop('packages', None) packages = deps.pop('packages', None)
pip = deps.pop('pip', None) pip = deps.pop('pip', None)
exec_cmds = deps.pop('exec', None) exec_cmds = deps.pop('exec', None)
pkg_cmd = f'\n\t&& apt-get install --no-install-recommends -y {" ".join(packages)} \\' if packages else '' pkg_cmd = (
f'\n\t&& apt-get install --no-install-recommends -y {" ".join(packages)} \\'
if packages
else ''
)
pip_cmd = f'\n\t&& pip install {" ".join(pip)} \\' if pip else '' pip_cmd = f'\n\t&& pip install {" ".join(pip)} \\' if pip else ''
content += f''' content += f'''
RUN dpkg --configure -a \\ RUN dpkg --configure -a \\
@ -171,7 +202,8 @@ RUN apt-get remove -y git \\
ENV PYTHONPATH /app:$PYTHONPATH ENV PYTHONPATH /app:$PYTHONPATH
CMD ["python", "-m", "platypush"] CMD ["python", "-m", "platypush"]
''') '''
)
dockerfile = os.path.join(device_dir, 'Dockerfile') dockerfile = os.path.join(device_dir, 'Dockerfile')
print('Generating Dockerfile {}'.format(dockerfile)) print('Generating Dockerfile {}'.format(dockerfile))
@ -185,19 +217,30 @@ def build(args):
ports = set() ports = set()
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='platydock build', prog='platydock build', description='Build a Platypush image from a config.yaml'
description='Build a Platypush image from a config.yaml'
) )
parser.add_argument('-c', '--config', type=str, required=True, parser.add_argument(
help='Path to the platypush configuration file') '-c',
parser.add_argument('-p', '--python-version', type=str, default='3.9', '--config',
help='Python version to be used') type=str,
required=True,
help='Path to the platypush configuration file',
)
parser.add_argument(
'-p',
'--python-version',
type=str,
default='3.9',
help='Python version to be used',
)
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
cfgfile = os.path.abspath(os.path.expanduser(opts.config)) cfgfile = os.path.abspath(os.path.expanduser(opts.config))
manifest._available_package_manager = 'apt' # Force apt for Debian-based Docker images manifest._available_package_manager = (
'apt' # Force apt for Debian-based Docker images
)
install_cmds = manifest.get_dependencies_from_conf(cfgfile) install_cmds = manifest.get_dependencies_from_conf(cfgfile)
python_version = opts.python_version python_version = opts.python_version
backend_config = Config.get_backends() backend_config = Config.get_backends()
@ -205,45 +248,75 @@ def build(args):
# Container exposed ports # Container exposed ports
if backend_config.get('http'): if backend_config.get('http'):
from platypush.backend.http import HttpBackend from platypush.backend.http import HttpBackend
# noinspection PyProtectedMember # noinspection PyProtectedMember
ports.add(backend_config['http'].get('port', HttpBackend._DEFAULT_HTTP_PORT)) ports.add(backend_config['http'].get('port', HttpBackend._DEFAULT_HTTP_PORT))
# noinspection PyProtectedMember # noinspection PyProtectedMember
ports.add(backend_config['http'].get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT)) ports.add(
backend_config['http'].get(
'websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT
)
)
if backend_config.get('tcp'): if backend_config.get('tcp'):
ports.add(backend_config['tcp']['port']) ports.add(backend_config['tcp']['port'])
if backend_config.get('websocket'): if backend_config.get('websocket'):
from platypush.backend.websocket import WebsocketBackend from platypush.backend.websocket import WebsocketBackend
# noinspection PyProtectedMember # noinspection PyProtectedMember
ports.add(backend_config['websocket'].get('port', WebsocketBackend._default_websocket_port)) ports.add(
backend_config['websocket'].get(
'port', WebsocketBackend._default_websocket_port
)
)
dev_dir = os.path.join(workdir, Config.get('device_id')) dev_dir = os.path.join(workdir, Config.get('device_id'))
generate_dockerfile( generate_dockerfile(
deps=dict(install_cmds), ports=ports, cfgfile=cfgfile, device_dir=dev_dir, python_version=python_version deps=dict(install_cmds),
ports=ports,
cfgfile=cfgfile,
device_dir=dev_dir,
python_version=python_version,
) )
subprocess.call( subprocess.call(
['docker', 'build', '-t', 'platypush-{}'.format(Config.get('device_id')), dev_dir] [
'docker',
'build',
'-t',
'platypush-{}'.format(Config.get('device_id')),
dev_dir,
]
) )
def start(args): def start(args):
global workdir global workdir
parser = argparse.ArgumentParser(prog='platydock start', parser = argparse.ArgumentParser(
prog='platydock start',
description='Start a Platypush container', description='Start a Platypush container',
epilog=textwrap.dedent(''' epilog=textwrap.dedent(
'''
You can append additional options that You can append additional options that
will be passed to the docker container. will be passed to the docker container.
Example: Example:
--add-host='myhost:192.168.1.1' --add-host='myhost:192.168.1.1'
''')) '''
),
)
parser.add_argument('image', type=str, help='Platypush image to start') parser.add_argument('image', type=str, help='Platypush image to start')
parser.add_argument('-p', '--publish', action='append', nargs='*', default=[], parser.add_argument(
help=textwrap.dedent(''' '-p',
'--publish',
action='append',
nargs='*',
default=[],
help=textwrap.dedent(
'''
Container's ports to expose to the host. Container's ports to expose to the host.
Note that the default exposed ports from Note that the default exposed ports from
the container service will be exposed unless the container service will be exposed unless
@ -253,13 +326,22 @@ def start(args):
Example: Example:
-p 18008:8008 -p 18009:8009 -p 18008:8008
''')) '''
),
)
parser.add_argument('-a', '--attach', action='store_true', default=False, parser.add_argument(
help=textwrap.dedent(''' '-a',
'--attach',
action='store_true',
default=False,
help=textwrap.dedent(
'''
If set, then attach to the container after starting it up (default: false). If set, then attach to the container after starting it up (default: false).
''')) '''
),
)
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
ports = {} ports = {}
@ -277,11 +359,20 @@ def start(args):
print('Preparing Redis support container') print('Preparing Redis support container')
subprocess.call(['docker', 'pull', 'redis']) subprocess.call(['docker', 'pull', 'redis'])
subprocess.call(['docker', 'run', '--rm', '--name', 'redis-' + opts.image, subprocess.call(
'-d', 'redis']) ['docker', 'run', '--rm', '--name', 'redis-' + opts.image, '-d', 'redis']
)
docker_cmd = ['docker', 'run', '--rm', '--name', opts.image, '-it', docker_cmd = [
'--link', 'redis-' + opts.image + ':redis'] 'docker',
'run',
'--rm',
'--name',
opts.image,
'-it',
'--link',
'redis-' + opts.image + ':redis',
]
for container_port, host_port in ports.items(): for container_port, host_port in ports.items():
docker_cmd += ['-p', host_port + ':' + container_port] docker_cmd += ['-p', host_port + ':' + container_port]
@ -297,8 +388,9 @@ def start(args):
def stop(args): def stop(args):
parser = argparse.ArgumentParser(prog='platydock stop', parser = argparse.ArgumentParser(
description='Stop a Platypush container') prog='platydock stop', description='Stop a Platypush container'
)
parser.add_argument('container', type=str, help='Platypush container to stop') parser.add_argument('container', type=str, help='Platypush container to stop')
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
@ -313,11 +405,13 @@ def stop(args):
def rm(args): def rm(args):
global workdir global workdir
parser = argparse.ArgumentParser(prog='platydock rm', parser = argparse.ArgumentParser(
description='Remove a Platypush image. ' + prog='platydock rm',
'NOTE: make sure that no container is ' + description='Remove a Platypush image. '
'running nor linked to the image before ' + + 'NOTE: make sure that no container is '
'removing it') + 'running nor linked to the image before '
+ 'removing it',
)
parser.add_argument('image', type=str, help='Platypush image to remove') parser.add_argument('image', type=str, help='Platypush image to remove')
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
@ -327,10 +421,10 @@ def rm(args):
def ls(args): def ls(args):
parser = argparse.ArgumentParser(prog='platydock ls', parser = argparse.ArgumentParser(
description='List available Platypush containers') prog='platydock ls', description='List available Platypush containers'
parser.add_argument('filter', type=str, nargs='?', )
help='Image name filter') parser.add_argument('filter', type=str, nargs='?', help='Image name filter')
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
@ -340,8 +434,9 @@ def ls(args):
images = [] images = []
for line in output: for line in output:
if re.match(r'^platypush-(.+?)\s.*', line): if re.match(r'^platypush-(.+?)\s.*', line) and (
if not opts.filter or (opts.filter and opts.filter in line): not opts.filter or (opts.filter and opts.filter in line)
):
images.append(line) images.append(line)
if images: if images:
@ -352,14 +447,17 @@ def ls(args):
def main(): def main():
parser = argparse.ArgumentParser(prog='platydock', add_help=False, parser = argparse.ArgumentParser(
prog='platydock',
add_help=False,
description='Manage Platypush docker containers', description='Manage Platypush docker containers',
epilog='Use platydock <action> --help to ' + epilog='Use platydock <action> --help to ' + 'get additional help',
'get additional help') )
# noinspection PyTypeChecker # noinspection PyTypeChecker
parser.add_argument('action', nargs='?', type=Action, choices=list(Action), parser.add_argument(
help='Action to execute') 'action', nargs='?', type=Action, choices=list(Action), help='Action to execute'
)
parser.add_argument('-h', '--help', action='store_true', help='Show usage') parser.add_argument('-h', '--help', action='store_true', help='Show usage')
opts, args = parser.parse_known_args(sys.argv[1:]) opts, args = parser.parse_known_args(sys.argv[1:])

View file

@ -5,6 +5,7 @@
alembic alembic
bcrypt bcrypt
croniter croniter
eventlet
flask flask
frozendict frozendict
gunicorn gunicorn

View file

@ -63,6 +63,7 @@ setup(
'alembic', 'alembic',
'bcrypt', 'bcrypt',
'croniter', 'croniter',
'eventlet',
'flask', 'flask',
'frozendict', 'frozendict',
'gunicorn', 'gunicorn',
@ -74,11 +75,12 @@ setup(
'redis', 'redis',
'requests', 'requests',
'rsa', 'rsa',
'simple_websocket',
'sqlalchemy', 'sqlalchemy',
'tz', 'tz',
'websocket-client', 'websocket-client',
'websockets',
'wheel', 'wheel',
'wsproto',
'zeroconf>=0.27.0', 'zeroconf>=0.27.0',
], ],
extras_require={ extras_require={