forked from platypush/platypush
[HTTP] Added authenticated /file?path=<path>
route.
This commit is contained in:
parent
9993e9b6b7
commit
ec050b2853
2 changed files with 95 additions and 16 deletions
68
platypush/backend/http/app/routes/file.py
Normal file
68
platypush/backend/http/app/routes/file.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import os
|
||||
import re
|
||||
from typing import Generator
|
||||
|
||||
from flask import Blueprint, abort, request
|
||||
from flask.wrappers import Response
|
||||
|
||||
from platypush.backend.http.app import template_folder
|
||||
from platypush.backend.http.app.utils import authenticate, logger
|
||||
from platypush.utils import get_mime_type
|
||||
|
||||
file = Blueprint('file', __name__, template_folder=template_folder)
|
||||
|
||||
# Declare routes list
|
||||
__routes__ = [
|
||||
file,
|
||||
]
|
||||
|
||||
|
||||
@file.route('/file', methods=['GET', 'HEAD'])
|
||||
@authenticate()
|
||||
def get_file_route():
|
||||
"""
|
||||
Endpoint to read the content of a file on the server.
|
||||
"""
|
||||
|
||||
def read_file(path: str) -> Generator[bytes, None, None]:
|
||||
with open(path, 'rb') as f:
|
||||
yield from iter(lambda: f.read(4096), b'')
|
||||
|
||||
path = os.sep + os.path.join(
|
||||
*[
|
||||
token
|
||||
for token in re.sub(
|
||||
r'^\.\./',
|
||||
'',
|
||||
re.sub(
|
||||
r'^\./',
|
||||
'',
|
||||
request.args.get('path', '').lstrip(os.sep).lstrip(' ') or '',
|
||||
),
|
||||
).split(os.sep)
|
||||
if token
|
||||
]
|
||||
)
|
||||
|
||||
logger().debug('Received file read request for %r', request.path)
|
||||
|
||||
if not os.path.isfile(path):
|
||||
logger().warning('File not found: %r', path)
|
||||
abort(404, 'File not found')
|
||||
|
||||
try:
|
||||
headers = {
|
||||
'Content-Length': str(os.path.getsize(path)),
|
||||
'Content-Type': (get_mime_type(path) or 'application/octet-stream'),
|
||||
}
|
||||
|
||||
if request.method == 'HEAD':
|
||||
return Response(status=200, headers=headers)
|
||||
|
||||
return read_file(path), 200, headers
|
||||
except PermissionError:
|
||||
logger().warning('Permission denied to read file %r', path)
|
||||
abort(403, 'Permission denied')
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -1,7 +1,7 @@
|
|||
import json
|
||||
import os
|
||||
import pathlib
|
||||
from typing import List, Dict
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
from platypush.plugins import Plugin, action
|
||||
|
||||
|
@ -27,9 +27,14 @@ class FilePlugin(Plugin):
|
|||
"""
|
||||
Read and return the content of a (text) file.
|
||||
|
||||
Note that this method should only be used for small text files, as it
|
||||
reads the entire file in memory.
|
||||
|
||||
If you need to read large/binary files, consider using the
|
||||
``GET /file?path=<path>`` HTTP API endpoint instead.
|
||||
|
||||
:param file: Path of the file.
|
||||
"""
|
||||
|
||||
with open(self._get_path(file), 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
@ -84,7 +89,9 @@ class FilePlugin(Plugin):
|
|||
exist they will be created (analogous to mkdir -p) (default: True).
|
||||
:param mode: Access mode (default: 0755).
|
||||
"""
|
||||
pathlib.Path(self._get_path(directory)).mkdir(parents=parents, exist_ok=exist_ok, mode=mode)
|
||||
pathlib.Path(self._get_path(directory)).mkdir(
|
||||
parents=parents, exist_ok=exist_ok, mode=mode
|
||||
)
|
||||
|
||||
@action
|
||||
def rmdir(self, directory: str):
|
||||
|
@ -142,13 +149,13 @@ class FilePlugin(Plugin):
|
|||
:param symbolic: If True, then the target link will be a symbolic link. Otherwise,
|
||||
it will be a hard link (default: symbolic).
|
||||
"""
|
||||
file = pathlib.Path(self._get_path(file))
|
||||
path = pathlib.Path(self._get_path(file))
|
||||
target = self._get_path(target)
|
||||
|
||||
if symbolic:
|
||||
file.symlink_to(target)
|
||||
path.symlink_to(target)
|
||||
else:
|
||||
file.link_to(target)
|
||||
path.hardlink_to(target)
|
||||
|
||||
@action
|
||||
def unlink(self, file: str):
|
||||
|
@ -160,7 +167,7 @@ class FilePlugin(Plugin):
|
|||
pathlib.Path(self._get_path(file)).unlink()
|
||||
|
||||
@action
|
||||
def list(self, path: str = os.path.abspath(os.sep)) -> List[Dict[str, str]]:
|
||||
def list(self, path: Optional[str] = None) -> List[Dict[str, str]]:
|
||||
"""
|
||||
List a file or all the files in a directory.
|
||||
|
||||
|
@ -168,26 +175,30 @@ class FilePlugin(Plugin):
|
|||
:return: List of files in the specified path, or absolute path of the specified path if ``path`` is a file and
|
||||
it exists. Each item will contain the fields ``type`` (``file`` or ``directory``) and ``path``.
|
||||
"""
|
||||
path = self._get_path(path)
|
||||
assert os.path.exists(path), 'No such file or directory: {}'.format(path)
|
||||
path = self._get_path(path or '/')
|
||||
assert path and os.path.exists(path), f'No such file or directory: {path}'
|
||||
|
||||
if not os.path.isdir(path):
|
||||
return [{
|
||||
return [
|
||||
{
|
||||
'type': 'file',
|
||||
'path': path,
|
||||
'name': os.path.basename(path),
|
||||
}]
|
||||
}
|
||||
]
|
||||
|
||||
return sorted(
|
||||
[
|
||||
{
|
||||
'type': 'directory' if os.path.isdir(os.path.join(path, f)) else 'file',
|
||||
'type': (
|
||||
'directory' if os.path.isdir(os.path.join(path, f)) else 'file'
|
||||
),
|
||||
'path': os.path.join(path, f),
|
||||
'name': os.path.basename(f),
|
||||
}
|
||||
for f in sorted(os.listdir(path))
|
||||
],
|
||||
key=lambda f: (f.get('type'), f.get('name'))
|
||||
key=lambda f: (f.get('type'), f.get('name')),
|
||||
)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue