From f8564c19cd93f80ae6fde207af5347fa1005116f Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <info@fabiomanganiello.com>
Date: Tue, 16 Mar 2021 00:03:32 +0100
Subject: [PATCH] Added log.http backend to monitor HTTP logs [closes #167]

---
 docs/source/backends.rst                      |   1 +
 docs/source/events.rst                        |   1 +
 docs/source/platypush/backend/log.http.rst    |   5 +
 docs/source/platypush/events/log.http.rst     |   5 +
 platypush/backend/file/monitor/__init__.py    |  31 +++-
 .../backend/file/monitor/entities/handlers.py |  20 ---
 platypush/backend/log/__init__.py             |   0
 platypush/backend/log/http.py                 | 154 ++++++++++++++++++
 platypush/message/event/log/__init__.py       |   0
 platypush/message/event/log/http.py           |  16 ++
 10 files changed, 207 insertions(+), 26 deletions(-)
 create mode 100644 docs/source/platypush/backend/log.http.rst
 create mode 100644 docs/source/platypush/events/log.http.rst
 create mode 100644 platypush/backend/log/__init__.py
 create mode 100644 platypush/backend/log/http.py
 create mode 100644 platypush/message/event/log/__init__.py
 create mode 100644 platypush/message/event/log/http.py

diff --git a/docs/source/backends.rst b/docs/source/backends.rst
index e532c7b3..bb5b3da0 100644
--- a/docs/source/backends.rst
+++ b/docs/source/backends.rst
@@ -35,6 +35,7 @@ Backends
     platypush/backend/kafka.rst
     platypush/backend/light.hue.rst
     platypush/backend/linode.rst
+    platypush/backend/log.http.rst
     platypush/backend/mail.rst
     platypush/backend/midi.rst
     platypush/backend/mqtt.rst
diff --git a/docs/source/events.rst b/docs/source/events.rst
index fd550753..15da3934 100644
--- a/docs/source/events.rst
+++ b/docs/source/events.rst
@@ -35,6 +35,7 @@ Events
     platypush/events/kafka.rst
     platypush/events/light.rst
     platypush/events/linode.rst
+    platypush/events/log.http.rst
     platypush/events/mail.rst
     platypush/events/media.rst
     platypush/events/midi.rst
diff --git a/docs/source/platypush/backend/log.http.rst b/docs/source/platypush/backend/log.http.rst
new file mode 100644
index 00000000..d016cb6b
--- /dev/null
+++ b/docs/source/platypush/backend/log.http.rst
@@ -0,0 +1,5 @@
+``platypush.backend.log.http``
+==============================
+
+.. automodule:: platypush.backend.log.http
+    :members:
diff --git a/docs/source/platypush/events/log.http.rst b/docs/source/platypush/events/log.http.rst
new file mode 100644
index 00000000..3cc28e04
--- /dev/null
+++ b/docs/source/platypush/events/log.http.rst
@@ -0,0 +1,5 @@
+``platypush.message.event.log.http``
+====================================
+
+.. automodule:: platypush.message.event.log.http
+    :members:
diff --git a/platypush/backend/file/monitor/__init__.py b/platypush/backend/file/monitor/__init__.py
index a2718bfb..3a9e1552 100644
--- a/platypush/backend/file/monitor/__init__.py
+++ b/platypush/backend/file/monitor/__init__.py
@@ -1,10 +1,10 @@
-from typing import List, Dict, Union, Any
+from typing import Iterable, Dict, Union, Any
 
 from watchdog.observers import Observer
 
 from platypush.backend import Backend
-from .entities.handlers import EventHandlerFactory
-from .entities.resources import MonitoredResource
+from .entities.handlers import EventHandler
+from .entities.resources import MonitoredResource, MonitoredPattern, MonitoredRegex
 
 
 class FileMonitorBackend(Backend):
@@ -23,7 +23,26 @@ class FileMonitorBackend(Backend):
 
     """
 
-    def __init__(self, paths: List[Union[str, Dict[str, Any], MonitoredResource]], **kwargs):
+    class EventHandlerFactory:
+        """
+        Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource.
+        """
+
+        @staticmethod
+        def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler:
+            if isinstance(resource, str):
+                resource = MonitoredResource(resource)
+            elif isinstance(resource, dict):
+                if 'patterns' in resource or 'ignore_patterns' in resource:
+                    resource = MonitoredPattern(**resource)
+                elif 'regexes' in resource or 'ignore_regexes' in resource:
+                    resource = MonitoredRegex(**resource)
+                else:
+                    resource = MonitoredResource(**resource)
+
+            return EventHandler.from_resource(resource)
+
+    def __init__(self, paths: Iterable[Union[str, Dict[str, Any], MonitoredResource]], **kwargs):
         """
         :param paths: List of paths to monitor. Paths can either be expressed in any of the following ways:
 
@@ -81,7 +100,7 @@ class FileMonitorBackend(Backend):
                               recursive: True
                               case_sensitive: False
                               regexes:
-                                - '.*\.jpe?g$'
+                                - '.*\\.jpe?g$'
                               ignore_patterns:
                                 - '^tmp_.*'
                               ignore_directories:
@@ -93,7 +112,7 @@ class FileMonitorBackend(Backend):
         self._observer = Observer()
 
         for path in paths:
-            handler = EventHandlerFactory.from_resource(path)
+            handler = self.EventHandlerFactory.from_resource(path)
             self._observer.schedule(handler, handler.resource.path, recursive=handler.resource.recursive)
 
     def run(self):
diff --git a/platypush/backend/file/monitor/entities/handlers.py b/platypush/backend/file/monitor/entities/handlers.py
index 7fb8ab25..76d9f50c 100644
--- a/platypush/backend/file/monitor/entities/handlers.py
+++ b/platypush/backend/file/monitor/entities/handlers.py
@@ -1,5 +1,4 @@
 import os
-from typing import Dict, Union, Any
 
 from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler, RegexMatchingEventHandler
 
@@ -57,22 +56,3 @@ class RegexEventHandler(EventHandler, RegexMatchingEventHandler):
                          ignore_regexes=resource.ignore_regexes,
                          ignore_directories=resource.ignore_directories,
                          case_sensitive=resource.case_sensitive)
-
-
-class EventHandlerFactory:
-    """
-    Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource.
-    """
-    @staticmethod
-    def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler:
-        if isinstance(resource, str):
-            resource = MonitoredResource(resource)
-        elif isinstance(resource, dict):
-            if 'patterns' in resource or 'ignore_patterns' in resource:
-                resource = MonitoredPattern(**resource)
-            elif 'regexes' in resource or 'ignore_regexes' in resource:
-                resource = MonitoredRegex(**resource)
-            else:
-                resource = MonitoredResource(**resource)
-
-        return EventHandler.from_resource(resource)
diff --git a/platypush/backend/log/__init__.py b/platypush/backend/log/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/platypush/backend/log/http.py b/platypush/backend/log/http.py
new file mode 100644
index 00000000..f1af9a44
--- /dev/null
+++ b/platypush/backend/log/http.py
@@ -0,0 +1,154 @@
+import datetime
+import os
+import re
+
+from dataclasses import dataclass
+from logging import getLogger
+from threading import RLock
+from typing import List, Optional, Iterable
+
+from platypush.backend.file.monitor import FileMonitorBackend, EventHandler, MonitoredResource
+from platypush.context import get_bus
+from platypush.message.event.log.http import HttpLogEvent
+
+logger = getLogger(__name__)
+
+
+class LogEventHandler(EventHandler):
+    http_line_regex = re.compile(r'^([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+\[([^]]+)]\s+"([^"]+)"\s+([\d]+)\s+'
+                                 r'([\d]+)\s*("([^"\s]+)")?\s*("([^"]+)")?$')
+
+    @dataclass
+    class FileResource:
+        path: str
+        pos: int = 0
+        lock: RLock = RLock()
+        last_timestamp: Optional[datetime.datetime] = None
+
+    def __init__(self, *args, monitored_files: Optional[Iterable[str]] = None, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._monitored_files = {}
+        self.monitor_files(monitored_files or [])
+
+    def monitor_files(self, files: Iterable[str]):
+        self._monitored_files.update({
+            f: self.FileResource(path=f, pos=self._get_size(f))
+            for f in files
+        })
+
+    @staticmethod
+    def _get_size(file: str) -> int:
+        try:
+            return os.path.getsize(file)
+        except Exception as e:
+            logger.warning('Could not open {}: {}'.format(file, str(e)))
+            return 0
+
+    def on_created(self, event):
+        self._reset_file(event.src_path)
+
+    def on_deleted(self, event):
+        self._reset_file(event.src_path)
+
+    def _reset_file(self, path: str):
+        file_info = self._monitored_files.get(path)
+        if not file_info:
+            return
+
+        file_info.pos = 0
+
+    def on_modified(self, event):
+        file_info = self._monitored_files.get(event.src_path)
+        if not file_info:
+            return
+
+        try:
+            file_size = os.path.getsize(event.src_path)
+        except OSError as e:
+            logger.warning('Could not get the size of {}: {}'.format(event.src_path, str(e)))
+            return
+
+        if file_info.pos > file_size:
+            logger.warning('The size of {} been unexpectedly decreased from {} to {} bytes'.format(
+                event.src_path, file_info.pos, file_size))
+            file_info.pos = 0
+
+        try:
+            with file_info.lock, open(event.src_path, 'r') as f:
+                f.seek(file_info.pos)
+                for line in f.readlines():
+                    evt = self._build_event(file=event.src_path, line=line)
+                    if evt and (not file_info.last_timestamp or evt.args['timestamp'] >= file_info.last_timestamp):
+                        get_bus().post(evt)
+                        file_info.last_timestamp = evt.args['timestamp']
+
+                file_info.pos = f.tell()
+        except OSError as e:
+            logger.warning('Error while reading from {}: {}'.format(self.resource.path, str(e)))
+
+    @classmethod
+    def _build_event(cls, file: str, line: str) -> Optional[HttpLogEvent]:
+        line = line.strip()
+        if not line:
+            return
+
+        m = cls.http_line_regex.match(line.strip())
+        if not m:
+            logger.warning('Could not parse log line from {}: {}'.format(file, line))
+            return
+
+        info = {
+            'address': m.group(1),
+            'user_identifier': m.group(2),
+            'user_id': m.group(3),
+            'time': datetime.datetime.strptime(m.group(4), '%d/%b/%Y:%H:%M:%S %z'),
+            'method': m.group(5).split(' ')[0],
+            'url': m.group(5).split(' ')[1],
+            'http_version': m.group(5).split(' ')[2].split('/')[1],
+            'status': int(m.group(6)),
+            'size': int(m.group(7)),
+            'referrer': m.group(9),
+            'user_agent': m.group(11),
+        }
+
+        for attr, value in info.items():
+            if value == '-':
+                info[attr] = None
+
+        return HttpLogEvent(logfile=file, **info)
+
+
+class LogHttpBackend(FileMonitorBackend):
+    """
+    This backend can be used to monitor one or more HTTP log files (tested on Apache and Nginx) and trigger events
+    whenever a new log line is added.
+
+    Triggers:
+
+        * :class:`platypush.message.event.log.http.HttpLogEvent` when a new log line is created.
+
+    Requires:
+
+        * **watchdog** (``pip install watchdog``)
+
+    """
+
+    class EventHandlerFactory:
+        @staticmethod
+        def from_resource(resource: str) -> LogEventHandler:
+            resource = MonitoredResource(resource)
+            return LogEventHandler.from_resource(resource)
+
+    def __init__(self, log_files: List[str], **kwargs):
+        """
+        :param log_files: List of log files to be monitored.
+        """
+        self.log_files = {os.path.expanduser(log) for log in log_files}
+        directories = {os.path.dirname(log) for log in self.log_files}
+        super().__init__(paths=directories, **kwargs)
+
+        # noinspection PyProtectedMember
+        handlers = self._observer._handlers
+        for hndls in handlers.values():
+            for hndl in hndls:
+                hndl.monitor_files(self.log_files)
diff --git a/platypush/message/event/log/__init__.py b/platypush/message/event/log/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/platypush/message/event/log/http.py b/platypush/message/event/log/http.py
new file mode 100644
index 00000000..6ecf6dd4
--- /dev/null
+++ b/platypush/message/event/log/http.py
@@ -0,0 +1,16 @@
+from datetime import datetime
+from typing import Optional
+
+from platypush.message.event import Event
+
+
+class HttpLogEvent(Event):
+    """
+    Event triggered when a new HTTP log entry is created.
+    """
+    def __init__(self, logfile: str, address: str, time: datetime, method: str, url: str, status: int, size: int,
+                 http_version: str = '1.0', user_id: Optional[str] = None, user_identifier: Optional[str] = None,
+                 referrer: Optional[str] = None, user_agent: Optional[str] = None, **kwargs):
+        super().__init__(logfile=logfile, address=address, time=time, method=method, url=url, status=status, size=size,
+                         http_version=http_version, user_id=user_id, user_identifier=user_identifier, referrer=referrer,
+                         user_agent=user_agent, **kwargs)