
717 lines
28 KiB
Raw Normal View History

2020-05-23 21:38:08 +02:00
import base64
import datetime
import errno
import getpass
import os
import threading
from binascii import hexlify
from typing import Optional, Dict, Tuple, List, Union, Any
from paramiko import DSSKey, RSAKey, SSHClient, WarningPolicy, SFTPClient
2020-05-23 21:38:08 +02:00
from paramiko.py3compat import u
from paramiko import GSS_AUTH_AVAILABLE
except ImportError:
from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
2020-05-23 21:38:08 +02:00
from platypush import Response
from platypush.message.response.ssh import SSHKeygenResponse
from platypush.plugins import Plugin, action
from platypush.plugins.ssh.tunnel.forward import forward_tunnel
from platypush.plugins.ssh.tunnel.reverse import reverse_tunnel, close_tunnel
class SshPlugin(Plugin):
SSH plugin.
2020-05-23 21:38:08 +02:00
* **paramiko** (``pip install paramiko``)
key_dispatch_table = {'dsa': DSSKey, 'rsa': RSAKey}
def __init__(self, key_file: Optional[str] = None, passphrase: Optional[str] = None, **kwargs):
:param key_file: Default key file (default: any "id_rsa", "id_dsa", "id_ecdsa", or "id_ed25519" key discoverable
in ``~/.ssh/``.
:param passphrase: Key file passphrase (default: None).
self.key_file = os.path.abspath(os.path.expanduser(key_file)) if key_file else None
self.passphrase = passphrase
self._sessions: Dict[Tuple[str, int, Optional[str]], SSHClient] = {}
self._fwd_tunnels: Dict[Tuple[int, str, int], dict] = {}
self._rev_tunnels: Dict[Tuple[int, str, int], dict] = {}
def _get_key(self, key_file: Optional[str] = None, passphrase: Optional[str] = None):
key_file = key_file or self.key_file
return (os.path.abspath(os.path.expanduser(key_file)) if key_file else None,
passphrase or self.passphrase)
def _get_host_port_user(host: str, port: int = 22, user: Optional[str] = None, **_):
if host.find('@') >= 0:
user, host = host.split('@')
if host.find(':') >= 0:
host, port = host.split(':')
if not user:
user = getpass.getuser()
return host, int(port), user
# noinspection PyShadowingBuiltins
def keygen(self,
filename: str,
type: str = 'rsa',
bits: int = 4096,
comment: Optional[str] = None,
passphrase: Optional[str] = None) -> SSHKeygenResponse:
Generate an SSH keypair.
:param filename: Output file name for the private key (the public key will be stored in <filename>.pub).
:param type: Encryption algorithm, either "rsa" or "dsa" (default: "rsa").
:param bits: Key length in bits (default: 4096).
:param comment: Key comment (default: None).
:param passphrase: Key passphrase (default: None).
:return: :class:`platypush.message.response.ssh.SSHKeygenResponse`.
assert type != 'dsa' or bits <= 1024, 'DSA keys support a maximum of 1024 bits'
assert type in self.key_dispatch_table, 'No such type: {}. Available types: {}'.format(
type, self.key_dispatch_table.keys())
if filename:
filename = os.path.abspath(os.path.expanduser(filename))
prv = self.key_dispatch_table[type].generate(bits=bits)
prv.write_private_key_file(filename=filename, password=passphrase)
pub = self.key_dispatch_table[type](filename=filename, password=passphrase)
pub_file = '{filename}.pub'.format(filename=filename)
with open(pub_file, 'w') as f:
f.write('{name} {key}'.format(name=pub.get_name(), key=pub.get_base64()))
if comment:
f.write(' ' + comment)
hash = u(hexlify(pub.get_fingerprint()))
return SSHKeygenResponse(fingerprint=hash, key_file=filename, pub_key_file=pub_file)
def run(self, *args, **kwargs):
return super().run(*args, **kwargs)
except Exception as e:
raise AssertionError(e)
def _connect(self,
host: str,
port: int = 22,
user: Optional[str] = None,
password: Optional[str] = None,
key_file: Optional[str] = None,
passphrase: Optional[str] = None,
compress: bool = False,
timeout: Optional[int] = None,
auth_timeout: Optional[int] = None) -> SSHClient:
host, port, user = self._get_host_port_user(host, port, user)
key = (host, port, user)
if key in self._sessions:'[Connect] The SSH session is already active: {user}@{host}:{port}'.format(
user=user, host=host, port=port))
return self._sessions[key]
key_file, passphrase = self._get_key(key_file, passphrase)
client = SSHClient()
args = {
'hostname': host,
'port': port,
'username': user,
'compress': compress,
'timeout': timeout,
'auth_timeout': auth_timeout,
if password:
args['password'] = password
elif key_file:
args['key_filename'] = key_file
args['passphrase'] = passphrase
args['gss_auth'] = GSS_AUTH_AVAILABLE
args['gss_kex'] = GSS_AUTH_AVAILABLE
self._sessions[key] = client
return client
except Exception as e:
raise AssertionError('Connection to {} failed: {}'.format(host, str(e)))
def connect(self,
host: str,
port: int = 22,
user: Optional[str] = None,
password: Optional[str] = None,
key_file: Optional[str] = None,
passphrase: Optional[str] = None,
compress: bool = False,
timeout: Optional[int] = None,
auth_timeout: Optional[int] = None) -> None:
Open an SSH connection.
:param host: Host name or IP. Can also be in the format ``[user]@<host>:[port]``.
:param port: Remote port (default: 22).
:param user: Username (default: None, same user name as the one running platypush).
:param password: Password (default: None).
:param key_file: Key file to use for authentication (default: None).
:param passphrase: Passphrase for the key file (default: None).
:param compress: Compress data on the connection (default: False).
:param timeout: Data transfer timeout in seconds (default: None).
:param auth_timeout: Authentication timeout in seconds (default: None).
self._connect(host=host, port=port, user=user, password=password, key_file=key_file, passphrase=passphrase,
compress=compress, timeout=timeout, auth_timeout=auth_timeout)
def disconnect(self,
host: str,
port: int = 22,
user: Optional[str] = None) -> None:
Close a connection to a host.
:param host: Host name or IP. Can also be in the format ``[user]@<host>:[port]``.
:param port: Remote port (default: 22).
:param user: Username (default: None, same user name as the one running platypush).
host, port, user = self._get_host_port_user(host, port, user)
key = (host, port, user)
if key not in self._sessions:'[Disconnect] The SSH session is not active: {user}@{host}:{port}'.format(
user=user, host=host, port=port))
session = self._sessions[key]
except Exception as e:
del self._sessions[key]
def exec(self, cmd: str, keep_alive: bool = False, timeout: Optional[int] = None,
stdin: Optional[str] = None, env: Optional[Dict[str, str]] = None, **kwargs) -> Response:
Run a command on a host.
:param cmd: Command to run
:param keep_alive: Keep the connection active after running the command (default: False).
:param timeout: Communication timeout in seconds (default: None).
:param stdin: Optional string to pass on the stdin of the command.
:param env: Dictionary of environment variables to be used for the connection (default: None).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
:return: The output of the executed command.
client = self._connect(**kwargs)
def decode(buf: bytes) -> str:
buf = buf.decode()
2021-04-05 00:58:44 +02:00
except (ValueError, TypeError):
2020-05-23 21:38:08 +02:00
buf = base64.encodebytes(buf).decode()
if buf.endswith('\n'):
buf = buf[:-1]
return buf
_in, _out, _err = client.exec_command(cmd, timeout=timeout, environment=env)
if stdin:
with _in:
resp = Response()
with _out:
resp.output = decode(
with _err:
err = decode(
if err:
resp.errors = [err]
return resp
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def is_directory(sftp: SFTPClient, path: str) -> bool:
f = sftp.lstat(path)
return S_ISDIR(f.st_mode)
def sftp_walk(cls, sftp: SFTPClient, path: str):
files = []
folders = []
for f in sftp.listdir_attr(path):
if S_ISDIR(f.st_mode):
yield path, folders, files
for folder in folders:
new_path = os.path.join(path, folder)
for x in cls.sftp_walk(sftp, new_path):
yield x
def sftp_get(self, sftp: SFTPClient, remote_path: str, local_path: str, recursive: bool = False) -> None:
if self.is_directory(sftp, remote_path):
assert recursive, '{} is a directory on the server but recursive has been set to False'
local_path = os.path.join(local_path, os.path.basename(remote_path))
os.makedirs(local_path, mode=0o755, exist_ok=True)
for path, folders, files in self.sftp_walk(sftp, '.'):
new_local_path = os.path.join(local_path, path)
os.makedirs(new_local_path, mode=0o755, exist_ok=True)
for file in files:'Downloading file {} from {} to {}'.format(file, path, new_local_path))
os.path.join(remote_path, path, file),
os.path.join(new_local_path, file),
if os.path.isdir(local_path):
local_path = os.path.join(local_path, os.path.basename(remote_path))
sftp.get(remote_path, local_path)
def get(self, remote_path: str, local_path: str, recursive: bool = False, keep_alive: bool = False,
**kwargs) -> None:
Download a file or folder from an SSH server.
:param remote_path: Remote path (file or directory).
:param local_path: Local path (file or directory).
:param recursive: Set to True if you want to recursively download folders (default: False).
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
local_path = os.path.abspath(os.path.expanduser(local_path))
kwargs['compress'] = True
client = self._connect(**kwargs)
sftp = client.open_sftp()
self.sftp_get(sftp, remote_path=remote_path, local_path=local_path, recursive=recursive)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def put(self, remote_path: str, local_path: str, recursive: bool = False, keep_alive: bool = False,
**kwargs) -> None:
Upload a file or folder to an SSH server.
:param remote_path: Remote path (file or directory).
:param local_path: Local path (file or directory).
:param recursive: Set to True if you want to recursively upload folders (default: False).
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
local_path = os.path.abspath(os.path.expanduser(local_path))
assert os.path.exists(local_path), os.strerror(errno.ENOENT)
kwargs['compress'] = True
client = self._connect(**kwargs)
sftp = client.open_sftp()
if os.path.isdir(local_path):
2021-04-05 00:58:44 +02:00
except Exception as e:
self.logger.warning(f'mkdir {remote_path}: {e}')
2020-05-23 21:38:08 +02:00
assert recursive, '{} is a directory but recursive has been set to False'.format(local_path)
assert self.is_directory(sftp, remote_path), \
'{} is not a directory on the remote host'.format(remote_path)
for path, folders, files in os.walk('.'):
2021-04-05 00:58:44 +02:00
except Exception as e:
self.logger.warning(f'mkdir {remote_path}: {e}')
2020-05-23 21:38:08 +02:00
for file in files:
src = os.path.join(path, file)
dst = os.path.join(path, file)'Copying {} to {}'.format(src, dst))
sftp.put(src, dst)
if self.is_directory(sftp, remote_path):
remote_path = os.path.join(remote_path, os.path.basename(local_path))
sftp.put(local_path, remote_path)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def ls(self, path: str = '.', attrs: bool = False, keep_alive: bool = False, **kwargs) \
-> Union[List[str], Dict[str, Any]]:
Return the list of files in a path on a remote server.
:param path: Remote path (default: current directory).
:param keep_alive: Keep the connection active after running the command (default: False).
:param attrs: Set to True if you want to get the full information of each file (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
:return: A list of filenames if ``attrs=False``, otherwise a dictionary ``filename -> {attributes`` if
client = self._connect(**kwargs)
sftp = client.open_sftp()
def get_file_type(st_mode: int) -> str:
if S_ISDIR(st_mode):
return 'directory'
elif S_ISBLK(st_mode):
return 'block'
elif S_ISCHR(st_mode):
return 'device'
elif S_ISDOOR(st_mode):
return 'door'
elif S_ISREG(st_mode):
return 'file'
elif S_ISLNK(st_mode):
return 'link'
elif S_ISFIFO(st_mode):
return 'fifo'
elif S_ISSOCK(st_mode):
return 'sock'
return 'unknown'
if attrs:
return {
f.filename: {
'filename': f.filename,
'longname': f.longname,
'attributes': f.attr,
'type': get_file_type(f.st_mode),
'access_time': datetime.datetime.fromtimestamp(f.st_atime),
'modify_time': datetime.datetime.fromtimestamp(f.st_mtime),
'uid': f.st_uid,
'gid': f.st_gid,
'size': f.st_size,
for f in sftp.listdir_attr(path)
return sftp.listdir(path)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def rm(self, path: str, keep_alive: bool = False, **kwargs) -> None:
Remove a file from the server.
:param path: Remote path to remove.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def mv(self, path: str, new_path: str, keep_alive: bool = False, **kwargs) -> None:
Move/rename a file.
:param path: Remote path to move/rename.
:param new_path: Destination path.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
sftp.posix_rename(path, new_path)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def mkdir(self, path: str, mode: int = 0o777, keep_alive: bool = False, **kwargs) -> None:
Create a directory.
:param path: Path to be created.
:param mode: Access permissions (default: 0777).
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
sftp.mkdir(path, mode=mode)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def rmdir(self, path: str, keep_alive: bool = False, **kwargs) -> None:
Remove a directory.
:param path: Path to be removed.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def ln(self, src: str, dest: str, keep_alive: bool = False, **kwargs) -> None:
Create a symbolic link.
:param src: Source path.
:param dest: Destination path.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
sftp.symlink(src, dest)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def chmod(self, path: str, mode: int, keep_alive: bool = False, **kwargs) -> None:
Change the access rights of a path.
:param path: Path to be modified.
:param mode: Access permissions (in octal mode).
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
sftp.chmod(path, mode=mode)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def chown(self, path: str, uid: int, gid: int, keep_alive: bool = False, **kwargs) -> None:
Change the owner of a path.
:param path: Path to be modified.
:param uid: New user ID.
:param gid: New group ID.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
sftp.chown(path, uid=uid, gid=gid)
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def chdir(self, path: str, keep_alive: bool = False, **kwargs) -> None:
Change directory to the specified path.
:param path: Destination path.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def getcwd(self, keep_alive: bool = False, **kwargs) -> str:
Get the current working directory.
:param keep_alive: Keep the connection active after running the command (default: False).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
client = self._connect(**kwargs)
sftp = client.open_sftp()
return sftp.getcwd()
if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs)
self.disconnect(host=host, port=port, user=user)
def start_forward_tunnel(self, local_port: int, remote_host: str, remote_port: int, bind_addr: Optional[str] = '',
Start an SSH forward tunnel, tunnelling <local_port> to <remote_host>:<remote_port>.
:param local_port: Local port.
:param remote_host: Remote host.
:param remote_port: Remote port.
:param bind_addr: If set, the `local_port` will be bound to this address/subnet (default: '', or any).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
key = local_port, remote_host, remote_port
if key in self._fwd_tunnels:'The tunnel {}:{}:{}:{} is already active'.format(
bind_addr, local_port, remote_host, remote_port))
client = self._connect(**kwargs)
server = forward_tunnel(local_port, remote_host, remote_port, client.get_transport(), bind_addr=bind_addr)
threading.Thread(target=server.serve_forever, name='sshfwdtun').start()
self._fwd_tunnels[key] = {
'client': client,
'server': server,
'args': kwargs,
def stop_forward_tunnel(self, local_port: int, remote_host: str, remote_port: int):
Stop an active SSH forward tunnel.
:param local_port: Local port.
:param remote_host: Remote host.
:param remote_port: Remote port.
key = (local_port, remote_host, remote_port)
if key not in self._fwd_tunnels:
self.logger.warning('No such forward tunnel: {}:{}:{}'.format(local_port, remote_host, remote_port))
server = self._fwd_tunnels[key]['server']
args = self._fwd_tunnels[key]['args']
host, port, user = self._get_host_port_user(**args)
self.disconnect(host=host, port=port, user=user)
def start_reverse_tunnel(self, server_port: int, remote_host: str, remote_port: int, bind_addr: Optional[str] = '',
Start an SSH reversed tunnel. <server_port> on the SSH server is forwarded across an SSH session back to the
local machine, and out to a <remote_host>:<remote_port> reachable from this network.
:param server_port: Server port.
:param remote_host: Remote host.
:param remote_port: Remote port.
:param bind_addr: If set, the `server_port` will be bound to this address/subnet (default: '', or any).
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`.
key = server_port, remote_host, remote_port
if key in self._fwd_tunnels:'The tunnel {}:{}:{}:{} is already active'.format(
bind_addr, server_port, remote_host, remote_port))
client = self._connect(**kwargs)
server = reverse_tunnel(server_port, remote_host, remote_port, transport=client.get_transport(),
threading.Thread(target=server, name='sshrevtun').start()
self._rev_tunnels[key] = {
'client': client,
'server': server,
'args': kwargs,
def stop_reverse_tunnel(self, server_port: int, remote_host: str, remote_port: int):
Stop an active SSH reversed tunnel.
:param server_port: Server port.
:param remote_host: Remote host.
:param remote_port: Remote port.
key = (server_port, remote_host, remote_port)
if key not in self._rev_tunnels:
self.logger.warning('No such reversed tunnel: {}:{}:{}'.format(server_port, remote_host, remote_port))
args = self._rev_tunnels[key]['args']
host, port, user = self._get_host_port_user(**args)
self.disconnect(host=host, port=port, user=user)
# vim:sw=4:ts=4:et: