Proper implementation for filesystem monitor filters

The logic Watchdog applies to filter events based
on `ignore_directories`, `ignore_patterns` and
`ignore_regexes` isn't really sophisticated, and
it doesn't check whether a partial directory/file
name is used in one of the `ignore_*` patterns.
The `file.monitor` backend should therefore implement
this logic on its side.
This commit is contained in:
Fabio Manganiello 2021-12-20 00:58:41 +01:00
parent e94d338de5
commit 0a3fd4065a
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,4 +1,5 @@
import os
import re
from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler, RegexMatchingEventHandler
@ -16,14 +17,50 @@ class EventHandler(FileSystemEventHandler):
resource.path = os.path.expanduser(resource.path)
self.resource = resource
def _should_ignore_event(self, event) -> bool:
ignore_dirs = [
os.path.expanduser(
_dir if os.path.expanduser(_dir).strip('/').startswith(self.resource.path)
else os.path.join(self.resource.path, _dir)
)
for _dir in getattr(self.resource, 'ignore_directories', [])
]
ignore_patterns = getattr(self.resource, 'ignore_patterns', None)
ignore_regexes = getattr(self.resource, 'ignore_regexes', None)
if ignore_dirs and any(
event.src_path.startswith(ignore_dir) for ignore_dir in ignore_dirs
):
return True
if ignore_patterns and any(
re.match(r'^{}$'.format(pattern.replace('*', '.*')), event.src_path)
for pattern in ignore_patterns
):
return True
if ignore_regexes and any(
re.match(regex, event.src_path)
for regex in ignore_patterns
):
return True
return False
def _on_event(self, event, output_event_type):
if self._should_ignore_event(event):
return
get_bus().post(output_event_type(path=event.src_path, is_directory=event.is_directory))
def on_created(self, event):
get_bus().post(FileSystemCreateEvent(path=event.src_path, is_directory=event.is_directory))
self._on_event(event, FileSystemCreateEvent)
def on_deleted(self, event):
get_bus().post(FileSystemDeleteEvent(path=event.src_path, is_directory=event.is_directory))
self._on_event(event, FileSystemDeleteEvent)
def on_modified(self, event):
get_bus().post(FileSystemModifyEvent(path=event.src_path, is_directory=event.is_directory))
self._on_event(event, FileSystemModifyEvent)
def on_moved(self, event):
pass