forked from platypush/platypush
[file] file.list
, file.is_binary
and file.get_user_home
actions.
- Added `sort` and `reverse` arguments to `file.list`. - Added `file.is_binary` and `file.get_user_home` actions.
This commit is contained in:
parent
213498318f
commit
1e9418b072
2 changed files with 97 additions and 25 deletions
|
@ -4,10 +4,10 @@ import pathlib
|
||||||
import stat
|
import stat
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from multiprocessing import RLock
|
from multiprocessing import RLock
|
||||||
from typing import Iterable, List, Dict, Optional, Set
|
from typing import Any, Iterable, List, Dict, Optional, Set
|
||||||
|
|
||||||
from platypush.plugins import Plugin, action
|
from platypush.plugins import Plugin, action
|
||||||
from platypush.utils import get_mime_type
|
from platypush.utils import get_mime_type, is_binary
|
||||||
|
|
||||||
|
|
||||||
class FilePlugin(Plugin):
|
class FilePlugin(Plugin):
|
||||||
|
@ -175,13 +175,24 @@ class FilePlugin(Plugin):
|
||||||
pathlib.Path(self._get_path(file)).unlink()
|
pathlib.Path(self._get_path(file)).unlink()
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def list(self, path: Optional[str] = None) -> List[Dict[str, str]]:
|
def list(
|
||||||
|
self,
|
||||||
|
path: Optional[str] = None,
|
||||||
|
sort: str = 'name',
|
||||||
|
reverse: bool = False,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
List a file or all the files in a directory.
|
List a file or all the files in a directory.
|
||||||
|
|
||||||
:param path: File or directory (default: root directory).
|
:param path: File or directory (default: root directory).
|
||||||
:return: List of files in the specified path, or absolute path of the specified path if ``path`` is a file and
|
:param sort: Sort the files by ``name``, ``size``, ``last_modified``
|
||||||
it exists. Each item will contain the fields ``type`` (``file`` or ``directory``) and ``path``.
|
or ``created`` time (default: ``name``).
|
||||||
|
:param reverse: If set, the files will be sorted in descending order
|
||||||
|
according to the specified ``sort`` field (default: False).
|
||||||
|
:return: List of files in the specified path, or absolute path of the
|
||||||
|
specified path if ``path`` is a file and it exists. Each item will
|
||||||
|
contain the fields ``type`` (``file`` or ``directory``) and
|
||||||
|
``path``.
|
||||||
"""
|
"""
|
||||||
path = self._get_path(path or '/')
|
path = self._get_path(path or '/')
|
||||||
assert path and os.path.exists(path), f'No such file or directory: {path}'
|
assert path and os.path.exists(path), f'No such file or directory: {path}'
|
||||||
|
@ -192,6 +203,7 @@ class FilePlugin(Plugin):
|
||||||
'type': 'file',
|
'type': 'file',
|
||||||
'path': path,
|
'path': path,
|
||||||
'name': os.path.basename(path),
|
'name': os.path.basename(path),
|
||||||
|
**self._get_file_info(path),
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -203,12 +215,47 @@ class FilePlugin(Plugin):
|
||||||
),
|
),
|
||||||
'path': os.path.join(path, f),
|
'path': os.path.join(path, f),
|
||||||
'name': os.path.basename(f),
|
'name': os.path.basename(f),
|
||||||
|
**self._get_file_info(os.path.join(path, f)),
|
||||||
}
|
}
|
||||||
for f in sorted(os.listdir(path))
|
for f in os.listdir(path)
|
||||||
],
|
],
|
||||||
key=lambda f: (f.get('type'), f.get('name')),
|
key=lambda f: (f.get('type'), (f.get(sort) or 0)),
|
||||||
|
reverse=reverse,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_file_info(file: str) -> Dict[str, Any]:
|
||||||
|
ret: dict = {'path': file}
|
||||||
|
|
||||||
|
try:
|
||||||
|
ret['size'] = os.path.getsize(file)
|
||||||
|
except Exception:
|
||||||
|
ret['size'] = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
ret['last_modified'] = os.path.getmtime(file)
|
||||||
|
except Exception:
|
||||||
|
ret['last_modified'] = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
ret['created'] = os.path.getctime(file)
|
||||||
|
except Exception:
|
||||||
|
ret['created'] = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
stat_info = os.stat(file)
|
||||||
|
ret.update(
|
||||||
|
{
|
||||||
|
'permissions': stat.filemode(stat_info.st_mode),
|
||||||
|
'owner': stat_info.st_uid,
|
||||||
|
'group': stat_info.st_gid,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
ret.update({'permissions': None, 'owner': None, 'group': None})
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def info(self, files: Iterable[str]) -> Dict[str, Dict[str, str]]:
|
def info(self, files: Iterable[str]) -> Dict[str, Dict[str, str]]:
|
||||||
"""
|
"""
|
||||||
|
@ -234,25 +281,18 @@ class FilePlugin(Plugin):
|
||||||
}
|
}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
ret = {}
|
with self._mime_types_lock:
|
||||||
for file in files:
|
ret = {}
|
||||||
file = self._get_path(file)
|
for file in files:
|
||||||
if not os.path.exists(file):
|
file = self._get_path(file)
|
||||||
self.logger.warning('File not found: %s', file)
|
if not os.path.exists(file):
|
||||||
continue
|
self.logger.warning('File not found: %s', file)
|
||||||
|
continue
|
||||||
|
|
||||||
stat_info = os.stat(file)
|
ret[file] = {
|
||||||
ret[file] = {
|
**self._get_file_info(file),
|
||||||
'path': file,
|
'mime_type': get_mime_type(file),
|
||||||
'name': os.path.basename(file),
|
}
|
||||||
'size': stat_info.st_size,
|
|
||||||
'type': 'directory' if os.path.isdir(file) else 'file',
|
|
||||||
'mime_type': get_mime_type(file),
|
|
||||||
'last_modified': stat_info.st_mtime,
|
|
||||||
'permissions': stat.filemode(stat_info.st_mode),
|
|
||||||
'owner': stat_info.st_uid,
|
|
||||||
'group': stat_info.st_gid,
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
@ -283,6 +323,22 @@ class FilePlugin(Plugin):
|
||||||
with self._mime_types_lock:
|
with self._mime_types_lock:
|
||||||
return self._get_mime_types(files, filter_types)
|
return self._get_mime_types(files, filter_types)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def is_binary(self, file: str) -> bool:
|
||||||
|
"""
|
||||||
|
:file: File path.
|
||||||
|
:return: True if the file is binary, False otherwise.
|
||||||
|
"""
|
||||||
|
with open(self._get_path(file), 'rb') as f:
|
||||||
|
return is_binary(f.read(1024))
|
||||||
|
|
||||||
|
@action
|
||||||
|
def get_user_home(self) -> str:
|
||||||
|
"""
|
||||||
|
:return: The current user's home directory.
|
||||||
|
"""
|
||||||
|
return str(pathlib.Path.home())
|
||||||
|
|
||||||
def _get_mime_types(
|
def _get_mime_types(
|
||||||
self, files: Iterable[str], filter_types: Set[str]
|
self, files: Iterable[str], filter_types: Set[str]
|
||||||
) -> Dict[str, str]:
|
) -> Dict[str, str]:
|
||||||
|
|
|
@ -865,4 +865,20 @@ def utcnow():
|
||||||
return datetime.datetime.now(datetime.timezone.utc)
|
return datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
|
def is_binary(data: Union[str, bytes]) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the given data is binary.
|
||||||
|
|
||||||
|
:param data: The data to be checked.
|
||||||
|
:return: True if the data is binary.
|
||||||
|
"""
|
||||||
|
if isinstance(data, str):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# From https://stackoverflow.com/questions/898669/how-can-i-detect-if-a-file-is-binary-non-text-in-python
|
||||||
|
assert isinstance(data, bytes), f"Invalid data type: {type(data)}"
|
||||||
|
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7F})
|
||||||
|
return bool(data.translate(None, textchars))
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
Loading…
Reference in a new issue