From 2c481c54afe9f52f343079f48e5b6758c63d962a Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 24 Aug 2024 16:22:10 +0200 Subject: [PATCH] [file] Added POST/PUT /file endpoints. --- .../http/app/streaming/plugins/file.py | 86 ++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/platypush/backend/http/app/streaming/plugins/file.py b/platypush/backend/http/app/streaming/plugins/file.py index b841bfa57f..a92b62737b 100644 --- a/platypush/backend/http/app/streaming/plugins/file.py +++ b/platypush/backend/http/app/streaming/plugins/file.py @@ -1,7 +1,8 @@ import os +import pathlib from contextlib import contextmanager from datetime import datetime as dt -from typing import Optional, Tuple +from typing import IO, Optional, Tuple from tornado.web import stream_request_body @@ -17,6 +18,8 @@ class FileRoute(StreamingRoute): """ BUFSIZE = 1024 + _bytes_written = 0 + _out_f: Optional[IO[bytes]] = None @classmethod def path(cls) -> str: @@ -39,6 +42,10 @@ class FileRoute(StreamingRoute): def file_size(self) -> int: return os.path.getsize(self.file_path) + @property + def _content_length(self) -> int: + return int(self.request.headers.get('Content-Length', 0)) + @property def range(self) -> Tuple[Optional[int], Optional[int]]: range_hdr = self.request.headers.get('Range') @@ -105,6 +112,77 @@ class FileRoute(StreamingRoute): self.finish() + def on_finish(self) -> None: + if self._out_f: + try: + if not (self._out_f and self._out_f.closed): + self._out_f.close() + except Exception as e: + self.logger.warning('Error while closing the output file: %s', e) + + self._out_f = None + + return super().on_finish() + + def _validate_upload(self, force: bool = False) -> bool: + if not self.file_path: + self.write_error(400, 'Missing path argument') + return False + + if not self._out_f: + if not force and os.path.exists(self.file_path): + self.write_error(409, f'{self.file_path} already exists') + return False + + self._bytes_written = 0 + dir_path = os.path.dirname(self.file_path) + + try: + pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True) + self._out_f = open( # pylint: disable=consider-using-with + self.file_path, 'wb' + ) + except PermissionError: + self.write_error(403, 'Permission denied') + return False + + return True + + def finish(self, *args, **kwargs): # type: ignore + try: + return super().finish(*args, **kwargs) + except Exception as e: + self.logger.warning('Error while finishing the request: %s', e) + + def data_received(self, chunk: bytes): + # Ignore unless we're in POST/PUT mode + if self.request.method not in ('POST', 'PUT'): + return + + force = self.request.method == 'PUT' + if not self._validate_upload(force=force): + self.finish() + return + + if not chunk: + self.logger.debug('Received EOF from client') + self.finish() + return + + assert self._out_f + self._out_f.write(chunk) + self._out_f.flush() + self._bytes_written += len(chunk) + self.logger.debug( + 'Written chunk of size %d to %s, progress: %d/%d', + len(chunk), + self.file_path, + self._bytes_written, + self._content_length, + ) + + self.flush() + def get(self) -> None: with self._serve() as f: if f: @@ -119,3 +197,9 @@ class FileRoute(StreamingRoute): def head(self) -> None: with self._serve(): pass + + def post(self) -> None: + self.logger.info('Receiving file POST upload request for %r', self.file_path) + + def put(self) -> None: + self.logger.info('Receiving file PUT upload request for %r', self.file_path)