Added log.http backend to monitor HTTP logs [closes #167]

This commit is contained in:
Fabio Manganiello 2021-03-16 00:03:32 +01:00
parent bf519babb0
commit f8564c19cd
10 changed files with 207 additions and 26 deletions

View file

@ -35,6 +35,7 @@ Backends
platypush/backend/kafka.rst
platypush/backend/light.hue.rst
platypush/backend/linode.rst
platypush/backend/log.http.rst
platypush/backend/mail.rst
platypush/backend/midi.rst
platypush/backend/mqtt.rst

View file

@ -35,6 +35,7 @@ Events
platypush/events/kafka.rst
platypush/events/light.rst
platypush/events/linode.rst
platypush/events/log.http.rst
platypush/events/mail.rst
platypush/events/media.rst
platypush/events/midi.rst

View file

@ -0,0 +1,5 @@
``platypush.backend.log.http``
==============================
.. automodule:: platypush.backend.log.http
:members:

View file

@ -0,0 +1,5 @@
``platypush.message.event.log.http``
====================================
.. automodule:: platypush.message.event.log.http
:members:

View file

@ -1,10 +1,10 @@
from typing import List, Dict, Union, Any
from typing import Iterable, Dict, Union, Any
from watchdog.observers import Observer
from platypush.backend import Backend
from .entities.handlers import EventHandlerFactory
from .entities.resources import MonitoredResource
from .entities.handlers import EventHandler
from .entities.resources import MonitoredResource, MonitoredPattern, MonitoredRegex
class FileMonitorBackend(Backend):
@ -23,7 +23,26 @@ class FileMonitorBackend(Backend):
"""
def __init__(self, paths: List[Union[str, Dict[str, Any], MonitoredResource]], **kwargs):
class EventHandlerFactory:
"""
Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource.
"""
@staticmethod
def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler:
if isinstance(resource, str):
resource = MonitoredResource(resource)
elif isinstance(resource, dict):
if 'patterns' in resource or 'ignore_patterns' in resource:
resource = MonitoredPattern(**resource)
elif 'regexes' in resource or 'ignore_regexes' in resource:
resource = MonitoredRegex(**resource)
else:
resource = MonitoredResource(**resource)
return EventHandler.from_resource(resource)
def __init__(self, paths: Iterable[Union[str, Dict[str, Any], MonitoredResource]], **kwargs):
"""
:param paths: List of paths to monitor. Paths can either be expressed in any of the following ways:
@ -81,7 +100,7 @@ class FileMonitorBackend(Backend):
recursive: True
case_sensitive: False
regexes:
- '.*\.jpe?g$'
- '.*\\.jpe?g$'
ignore_patterns:
- '^tmp_.*'
ignore_directories:
@ -93,7 +112,7 @@ class FileMonitorBackend(Backend):
self._observer = Observer()
for path in paths:
handler = EventHandlerFactory.from_resource(path)
handler = self.EventHandlerFactory.from_resource(path)
self._observer.schedule(handler, handler.resource.path, recursive=handler.resource.recursive)
def run(self):

View file

@ -1,5 +1,4 @@
import os
from typing import Dict, Union, Any
from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler, RegexMatchingEventHandler
@ -57,22 +56,3 @@ class RegexEventHandler(EventHandler, RegexMatchingEventHandler):
ignore_regexes=resource.ignore_regexes,
ignore_directories=resource.ignore_directories,
case_sensitive=resource.case_sensitive)
class EventHandlerFactory:
"""
Create a file system event handler from a string, dictionary or ``MonitoredResource`` resource.
"""
@staticmethod
def from_resource(resource: Union[str, Dict[str, Any], MonitoredResource]) -> EventHandler:
if isinstance(resource, str):
resource = MonitoredResource(resource)
elif isinstance(resource, dict):
if 'patterns' in resource or 'ignore_patterns' in resource:
resource = MonitoredPattern(**resource)
elif 'regexes' in resource or 'ignore_regexes' in resource:
resource = MonitoredRegex(**resource)
else:
resource = MonitoredResource(**resource)
return EventHandler.from_resource(resource)

View file

View file

@ -0,0 +1,154 @@
import datetime
import os
import re
from dataclasses import dataclass
from logging import getLogger
from threading import RLock
from typing import List, Optional, Iterable
from platypush.backend.file.monitor import FileMonitorBackend, EventHandler, MonitoredResource
from platypush.context import get_bus
from platypush.message.event.log.http import HttpLogEvent
logger = getLogger(__name__)
class LogEventHandler(EventHandler):
http_line_regex = re.compile(r'^([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+\[([^]]+)]\s+"([^"]+)"\s+([\d]+)\s+'
r'([\d]+)\s*("([^"\s]+)")?\s*("([^"]+)")?$')
@dataclass
class FileResource:
path: str
pos: int = 0
lock: RLock = RLock()
last_timestamp: Optional[datetime.datetime] = None
def __init__(self, *args, monitored_files: Optional[Iterable[str]] = None, **kwargs):
super().__init__(*args, **kwargs)
self._monitored_files = {}
self.monitor_files(monitored_files or [])
def monitor_files(self, files: Iterable[str]):
self._monitored_files.update({
f: self.FileResource(path=f, pos=self._get_size(f))
for f in files
})
@staticmethod
def _get_size(file: str) -> int:
try:
return os.path.getsize(file)
except Exception as e:
logger.warning('Could not open {}: {}'.format(file, str(e)))
return 0
def on_created(self, event):
self._reset_file(event.src_path)
def on_deleted(self, event):
self._reset_file(event.src_path)
def _reset_file(self, path: str):
file_info = self._monitored_files.get(path)
if not file_info:
return
file_info.pos = 0
def on_modified(self, event):
file_info = self._monitored_files.get(event.src_path)
if not file_info:
return
try:
file_size = os.path.getsize(event.src_path)
except OSError as e:
logger.warning('Could not get the size of {}: {}'.format(event.src_path, str(e)))
return
if file_info.pos > file_size:
logger.warning('The size of {} been unexpectedly decreased from {} to {} bytes'.format(
event.src_path, file_info.pos, file_size))
file_info.pos = 0
try:
with file_info.lock, open(event.src_path, 'r') as f:
f.seek(file_info.pos)
for line in f.readlines():
evt = self._build_event(file=event.src_path, line=line)
if evt and (not file_info.last_timestamp or evt.args['timestamp'] >= file_info.last_timestamp):
get_bus().post(evt)
file_info.last_timestamp = evt.args['timestamp']
file_info.pos = f.tell()
except OSError as e:
logger.warning('Error while reading from {}: {}'.format(self.resource.path, str(e)))
@classmethod
def _build_event(cls, file: str, line: str) -> Optional[HttpLogEvent]:
line = line.strip()
if not line:
return
m = cls.http_line_regex.match(line.strip())
if not m:
logger.warning('Could not parse log line from {}: {}'.format(file, line))
return
info = {
'address': m.group(1),
'user_identifier': m.group(2),
'user_id': m.group(3),
'time': datetime.datetime.strptime(m.group(4), '%d/%b/%Y:%H:%M:%S %z'),
'method': m.group(5).split(' ')[0],
'url': m.group(5).split(' ')[1],
'http_version': m.group(5).split(' ')[2].split('/')[1],
'status': int(m.group(6)),
'size': int(m.group(7)),
'referrer': m.group(9),
'user_agent': m.group(11),
}
for attr, value in info.items():
if value == '-':
info[attr] = None
return HttpLogEvent(logfile=file, **info)
class LogHttpBackend(FileMonitorBackend):
"""
This backend can be used to monitor one or more HTTP log files (tested on Apache and Nginx) and trigger events
whenever a new log line is added.
Triggers:
* :class:`platypush.message.event.log.http.HttpLogEvent` when a new log line is created.
Requires:
* **watchdog** (``pip install watchdog``)
"""
class EventHandlerFactory:
@staticmethod
def from_resource(resource: str) -> LogEventHandler:
resource = MonitoredResource(resource)
return LogEventHandler.from_resource(resource)
def __init__(self, log_files: List[str], **kwargs):
"""
:param log_files: List of log files to be monitored.
"""
self.log_files = {os.path.expanduser(log) for log in log_files}
directories = {os.path.dirname(log) for log in self.log_files}
super().__init__(paths=directories, **kwargs)
# noinspection PyProtectedMember
handlers = self._observer._handlers
for hndls in handlers.values():
for hndl in hndls:
hndl.monitor_files(self.log_files)

View file

View file

@ -0,0 +1,16 @@
from datetime import datetime
from typing import Optional
from platypush.message.event import Event
class HttpLogEvent(Event):
"""
Event triggered when a new HTTP log entry is created.
"""
def __init__(self, logfile: str, address: str, time: datetime, method: str, url: str, status: int, size: int,
http_version: str = '1.0', user_id: Optional[str] = None, user_identifier: Optional[str] = None,
referrer: Optional[str] = None, user_agent: Optional[str] = None, **kwargs):
super().__init__(logfile=logfile, address=address, time=time, method=method, url=url, status=status, size=size,
http_version=http_version, user_id=user_id, user_identifier=user_identifier, referrer=referrer,
user_agent=user_agent, **kwargs)