From 1e9418b0725b34c7bc5054a015fcab321574d3c0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 21 Aug 2024 00:18:07 +0200 Subject: [PATCH] [file] `file.list`, `file.is_binary` and `file.get_user_home` actions. - Added `sort` and `reverse` arguments to `file.list`. - Added `file.is_binary` and `file.get_user_home` actions. --- platypush/plugins/file/__init__.py | 106 ++++++++++++++++++++++------- platypush/utils/__init__.py | 16 +++++ 2 files changed, 97 insertions(+), 25 deletions(-) diff --git a/platypush/plugins/file/__init__.py b/platypush/plugins/file/__init__.py index 5dc66b42dd..afbfe0bdb6 100644 --- a/platypush/plugins/file/__init__.py +++ b/platypush/plugins/file/__init__.py @@ -4,10 +4,10 @@ import pathlib import stat from functools import lru_cache from multiprocessing import RLock -from typing import Iterable, List, Dict, Optional, Set +from typing import Any, Iterable, List, Dict, Optional, Set from platypush.plugins import Plugin, action -from platypush.utils import get_mime_type +from platypush.utils import get_mime_type, is_binary class FilePlugin(Plugin): @@ -175,13 +175,24 @@ class FilePlugin(Plugin): pathlib.Path(self._get_path(file)).unlink() @action - def list(self, path: Optional[str] = None) -> List[Dict[str, str]]: + def list( + self, + path: Optional[str] = None, + sort: str = 'name', + reverse: bool = False, + ) -> List[Dict[str, Any]]: """ List a file or all the files in a directory. :param path: File or directory (default: root directory). - :return: List of files in the specified path, or absolute path of the specified path if ``path`` is a file and - it exists. Each item will contain the fields ``type`` (``file`` or ``directory``) and ``path``. + :param sort: Sort the files by ``name``, ``size``, ``last_modified`` + or ``created`` time (default: ``name``). + :param reverse: If set, the files will be sorted in descending order + according to the specified ``sort`` field (default: False). + :return: List of files in the specified path, or absolute path of the + specified path if ``path`` is a file and it exists. Each item will + contain the fields ``type`` (``file`` or ``directory``) and + ``path``. """ path = self._get_path(path or '/') assert path and os.path.exists(path), f'No such file or directory: {path}' @@ -192,6 +203,7 @@ class FilePlugin(Plugin): 'type': 'file', 'path': path, 'name': os.path.basename(path), + **self._get_file_info(path), } ] @@ -203,12 +215,47 @@ class FilePlugin(Plugin): ), 'path': os.path.join(path, f), 'name': os.path.basename(f), + **self._get_file_info(os.path.join(path, f)), } - for f in sorted(os.listdir(path)) + for f in os.listdir(path) ], - key=lambda f: (f.get('type'), f.get('name')), + key=lambda f: (f.get('type'), (f.get(sort) or 0)), + reverse=reverse, ) + @staticmethod + def _get_file_info(file: str) -> Dict[str, Any]: + ret: dict = {'path': file} + + try: + ret['size'] = os.path.getsize(file) + except Exception: + ret['size'] = None + + try: + ret['last_modified'] = os.path.getmtime(file) + except Exception: + ret['last_modified'] = None + + try: + ret['created'] = os.path.getctime(file) + except Exception: + ret['created'] = None + + try: + stat_info = os.stat(file) + ret.update( + { + 'permissions': stat.filemode(stat_info.st_mode), + 'owner': stat_info.st_uid, + 'group': stat_info.st_gid, + } + ) + except Exception: + ret.update({'permissions': None, 'owner': None, 'group': None}) + + return ret + @action def info(self, files: Iterable[str]) -> Dict[str, Dict[str, str]]: """ @@ -234,25 +281,18 @@ class FilePlugin(Plugin): } """ - ret = {} - for file in files: - file = self._get_path(file) - if not os.path.exists(file): - self.logger.warning('File not found: %s', file) - continue + with self._mime_types_lock: + ret = {} + for file in files: + file = self._get_path(file) + if not os.path.exists(file): + self.logger.warning('File not found: %s', file) + continue - stat_info = os.stat(file) - ret[file] = { - 'path': file, - 'name': os.path.basename(file), - 'size': stat_info.st_size, - 'type': 'directory' if os.path.isdir(file) else 'file', - 'mime_type': get_mime_type(file), - 'last_modified': stat_info.st_mtime, - 'permissions': stat.filemode(stat_info.st_mode), - 'owner': stat_info.st_uid, - 'group': stat_info.st_gid, - } + ret[file] = { + **self._get_file_info(file), + 'mime_type': get_mime_type(file), + } return ret @@ -283,6 +323,22 @@ class FilePlugin(Plugin): with self._mime_types_lock: return self._get_mime_types(files, filter_types) + @action + def is_binary(self, file: str) -> bool: + """ + :file: File path. + :return: True if the file is binary, False otherwise. + """ + with open(self._get_path(file), 'rb') as f: + return is_binary(f.read(1024)) + + @action + def get_user_home(self) -> str: + """ + :return: The current user's home directory. + """ + return str(pathlib.Path.home()) + def _get_mime_types( self, files: Iterable[str], filter_types: Set[str] ) -> Dict[str, str]: diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index e43f4358e7..3851353f09 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -865,4 +865,20 @@ def utcnow(): return datetime.datetime.now(datetime.timezone.utc) +def is_binary(data: Union[str, bytes]) -> bool: + """ + Check if the given data is binary. + + :param data: The data to be checked. + :return: True if the data is binary. + """ + if isinstance(data, str): + return False + + # From https://stackoverflow.com/questions/898669/how-can-i-detect-if-a-file-is-binary-non-text-in-python + assert isinstance(data, bytes), f"Invalid data type: {type(data)}" + textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7F}) + return bool(data.translate(None, textchars)) + + # vim:sw=4:ts=4:et: