diff --git a/docs/source/backends.rst b/docs/source/backends.rst index e532c7b3..bb5b3da0 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -35,6 +35,7 @@ Backends platypush/backend/kafka.rst platypush/backend/light.hue.rst platypush/backend/linode.rst + platypush/backend/log.http.rst platypush/backend/mail.rst platypush/backend/midi.rst platypush/backend/mqtt.rst diff --git a/docs/source/events.rst b/docs/source/events.rst index fd550753..15da3934 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -35,6 +35,7 @@ Events platypush/events/kafka.rst platypush/events/light.rst platypush/events/linode.rst + platypush/events/log.http.rst platypush/events/mail.rst platypush/events/media.rst platypush/events/midi.rst diff --git a/docs/source/platypush/backend/log.http.rst b/docs/source/platypush/backend/log.http.rst new file mode 100644 index 00000000..d016cb6b --- /dev/null +++ b/docs/source/platypush/backend/log.http.rst @@ -0,0 +1,5 @@ +``platypush.backend.log.http`` +============================== + +.. automodule:: platypush.backend.log.http + :members: diff --git a/docs/source/platypush/events/log.http.rst b/docs/source/platypush/events/log.http.rst new file mode 100644 index 00000000..3cc28e04 --- /dev/null +++ b/docs/source/platypush/events/log.http.rst @@ -0,0 +1,5 @@ +``platypush.message.event.log.http`` +==================================== + +.. automodule:: platypush.message.event.log.http + :members: diff --git a/platypush/backend/file/monitor/__init__.py b/platypush/backend/file/monitor/__init__.py index a2718bfb..3a9e1552 100644 --- a/platypush/backend/file/monitor/__init__.py +++ b/platypush/backend/file/monitor/__init__.py @@ -1,10 +1,10 @@ -from typing import List, Dict, Union, Any +from typing import Iterable, Dict, Union, Any from watchdog.observers import Observer from platypush.backend import Backend -from .entities.handlers import EventHandlerFactory -from .entities.resources import MonitoredResource +from .entities.handlers import EventHandler +from .entities.resources import MonitoredResource, MonitoredPattern, MonitoredRegex class FileMonitorBackend(Backend): @@ -23,7 +23,26 @@ class FileMonitorBackend(Backend): """ - def __init__(self, paths: List[Union[str, Dict[str, Any], MonitoredResource]], **kwargs): + class EventHandlerFactory: + """ + Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource. + """ + + @staticmethod + def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler: + if isinstance(resource, str): + resource = MonitoredResource(resource) + elif isinstance(resource, dict): + if 'patterns' in resource or 'ignore_patterns' in resource: + resource = MonitoredPattern(**resource) + elif 'regexes' in resource or 'ignore_regexes' in resource: + resource = MonitoredRegex(**resource) + else: + resource = MonitoredResource(**resource) + + return EventHandler.from_resource(resource) + + def __init__(self, paths: Iterable[Union[str, Dict[str, Any], MonitoredResource]], **kwargs): """ :param paths: List of paths to monitor. Paths can either be expressed in any of the following ways: @@ -81,7 +100,7 @@ class FileMonitorBackend(Backend): recursive: True case_sensitive: False regexes: - - '.*\.jpe?g$' + - '.*\\.jpe?g$' ignore_patterns: - '^tmp_.*' ignore_directories: @@ -93,7 +112,7 @@ class FileMonitorBackend(Backend): self._observer = Observer() for path in paths: - handler = EventHandlerFactory.from_resource(path) + handler = self.EventHandlerFactory.from_resource(path) self._observer.schedule(handler, handler.resource.path, recursive=handler.resource.recursive) def run(self): diff --git a/platypush/backend/file/monitor/entities/handlers.py b/platypush/backend/file/monitor/entities/handlers.py index 7fb8ab25..76d9f50c 100644 --- a/platypush/backend/file/monitor/entities/handlers.py +++ b/platypush/backend/file/monitor/entities/handlers.py @@ -1,5 +1,4 @@ import os -from typing import Dict, Union, Any from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler, RegexMatchingEventHandler @@ -57,22 +56,3 @@ class RegexEventHandler(EventHandler, RegexMatchingEventHandler): ignore_regexes=resource.ignore_regexes, ignore_directories=resource.ignore_directories, case_sensitive=resource.case_sensitive) - - -class EventHandlerFactory: - """ - Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource. - """ - @staticmethod - def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler: - if isinstance(resource, str): - resource = MonitoredResource(resource) - elif isinstance(resource, dict): - if 'patterns' in resource or 'ignore_patterns' in resource: - resource = MonitoredPattern(**resource) - elif 'regexes' in resource or 'ignore_regexes' in resource: - resource = MonitoredRegex(**resource) - else: - resource = MonitoredResource(**resource) - - return EventHandler.from_resource(resource) diff --git a/platypush/backend/log/__init__.py b/platypush/backend/log/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/platypush/backend/log/http.py b/platypush/backend/log/http.py new file mode 100644 index 00000000..f1af9a44 --- /dev/null +++ b/platypush/backend/log/http.py @@ -0,0 +1,154 @@ +import datetime +import os +import re + +from dataclasses import dataclass +from logging import getLogger +from threading import RLock +from typing import List, Optional, Iterable + +from platypush.backend.file.monitor import FileMonitorBackend, EventHandler, MonitoredResource +from platypush.context import get_bus +from platypush.message.event.log.http import HttpLogEvent + +logger = getLogger(__name__) + + +class LogEventHandler(EventHandler): + http_line_regex = re.compile(r'^([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+\[([^]]+)]\s+"([^"]+)"\s+([\d]+)\s+' + r'([\d]+)\s*("([^"\s]+)")?\s*("([^"]+)")?$') + + @dataclass + class FileResource: + path: str + pos: int = 0 + lock: RLock = RLock() + last_timestamp: Optional[datetime.datetime] = None + + def __init__(self, *args, monitored_files: Optional[Iterable[str]] = None, **kwargs): + super().__init__(*args, **kwargs) + self._monitored_files = {} + self.monitor_files(monitored_files or []) + + def monitor_files(self, files: Iterable[str]): + self._monitored_files.update({ + f: self.FileResource(path=f, pos=self._get_size(f)) + for f in files + }) + + @staticmethod + def _get_size(file: str) -> int: + try: + return os.path.getsize(file) + except Exception as e: + logger.warning('Could not open {}: {}'.format(file, str(e))) + return 0 + + def on_created(self, event): + self._reset_file(event.src_path) + + def on_deleted(self, event): + self._reset_file(event.src_path) + + def _reset_file(self, path: str): + file_info = self._monitored_files.get(path) + if not file_info: + return + + file_info.pos = 0 + + def on_modified(self, event): + file_info = self._monitored_files.get(event.src_path) + if not file_info: + return + + try: + file_size = os.path.getsize(event.src_path) + except OSError as e: + logger.warning('Could not get the size of {}: {}'.format(event.src_path, str(e))) + return + + if file_info.pos > file_size: + logger.warning('The size of {} been unexpectedly decreased from {} to {} bytes'.format( + event.src_path, file_info.pos, file_size)) + file_info.pos = 0 + + try: + with file_info.lock, open(event.src_path, 'r') as f: + f.seek(file_info.pos) + for line in f.readlines(): + evt = self._build_event(file=event.src_path, line=line) + if evt and (not file_info.last_timestamp or evt.args['timestamp'] >= file_info.last_timestamp): + get_bus().post(evt) + file_info.last_timestamp = evt.args['timestamp'] + + file_info.pos = f.tell() + except OSError as e: + logger.warning('Error while reading from {}: {}'.format(self.resource.path, str(e))) + + @classmethod + def _build_event(cls, file: str, line: str) -> Optional[HttpLogEvent]: + line = line.strip() + if not line: + return + + m = cls.http_line_regex.match(line.strip()) + if not m: + logger.warning('Could not parse log line from {}: {}'.format(file, line)) + return + + info = { + 'address': m.group(1), + 'user_identifier': m.group(2), + 'user_id': m.group(3), + 'time': datetime.datetime.strptime(m.group(4), '%d/%b/%Y:%H:%M:%S %z'), + 'method': m.group(5).split(' ')[0], + 'url': m.group(5).split(' ')[1], + 'http_version': m.group(5).split(' ')[2].split('/')[1], + 'status': int(m.group(6)), + 'size': int(m.group(7)), + 'referrer': m.group(9), + 'user_agent': m.group(11), + } + + for attr, value in info.items(): + if value == '-': + info[attr] = None + + return HttpLogEvent(logfile=file, **info) + + +class LogHttpBackend(FileMonitorBackend): + """ + This backend can be used to monitor one or more HTTP log files (tested on Apache and Nginx) and trigger events + whenever a new log line is added. + + Triggers: + + * :class:`platypush.message.event.log.http.HttpLogEvent` when a new log line is created. + + Requires: + + * **watchdog** (``pip install watchdog``) + + """ + + class EventHandlerFactory: + @staticmethod + def from_resource(resource: str) -> LogEventHandler: + resource = MonitoredResource(resource) + return LogEventHandler.from_resource(resource) + + def __init__(self, log_files: List[str], **kwargs): + """ + :param log_files: List of log files to be monitored. + """ + self.log_files = {os.path.expanduser(log) for log in log_files} + directories = {os.path.dirname(log) for log in self.log_files} + super().__init__(paths=directories, **kwargs) + + # noinspection PyProtectedMember + handlers = self._observer._handlers + for hndls in handlers.values(): + for hndl in hndls: + hndl.monitor_files(self.log_files) diff --git a/platypush/message/event/log/__init__.py b/platypush/message/event/log/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/platypush/message/event/log/http.py b/platypush/message/event/log/http.py new file mode 100644 index 00000000..6ecf6dd4 --- /dev/null +++ b/platypush/message/event/log/http.py @@ -0,0 +1,16 @@ +from datetime import datetime +from typing import Optional + +from platypush.message.event import Event + + +class HttpLogEvent(Event): + """ + Event triggered when a new HTTP log entry is created. + """ + def __init__(self, logfile: str, address: str, time: datetime, method: str, url: str, status: int, size: int, + http_version: str = '1.0', user_id: Optional[str] = None, user_identifier: Optional[str] = None, + referrer: Optional[str] = None, user_agent: Optional[str] = None, **kwargs): + super().__init__(logfile=logfile, address=address, time=time, method=method, url=url, status=status, size=size, + http_version=http_version, user_id=user_id, user_identifier=user_identifier, referrer=referrer, + user_agent=user_agent, **kwargs)