homelab_automation/app/services/console_log_service.py

219 lines
7.6 KiB
Python

"""
Service de capture des logs console (stdout/stderr).
Capture les logs de l'application en temps réel pour les afficher dans l'UI.
"""
import sys
import io
import re
import logging
import threading
from datetime import datetime, timezone
from collections import deque
from typing import List, Optional
from dataclasses import dataclass, asdict
@dataclass
class ConsoleLogEntry:
"""Entrée de log console."""
id: int
timestamp: str
level: str
message: str
source: str = "console"
def to_dict(self):
return asdict(self)
class ConsoleLogCapture:
"""
Capture les logs console (stdout/stderr) et les stocke en mémoire.
Utilise un buffer circulaire pour limiter l'utilisation mémoire.
"""
def __init__(self, max_entries: int = 2000):
self.max_entries = max_entries
self._logs: deque = deque(maxlen=max_entries)
self._lock = threading.Lock()
self._id_counter = 0
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
self._capturing = False
# Patterns pour détecter le niveau de log
self._level_patterns = [
(re.compile(r'\bERROR\b', re.IGNORECASE), 'ERROR'),
(re.compile(r'\bWARN(?:ING)?\b', re.IGNORECASE), 'WARN'),
(re.compile(r'\bDEBUG\b', re.IGNORECASE), 'DEBUG'),
(re.compile(r'\b(INFO|Started|Waiting|Application)\b', re.IGNORECASE), 'INFO'),
(re.compile(r'[✅🚀📋📦⏰🔔]'), 'INFO'),
(re.compile(r'[⚠️❌]'), 'WARN'),
]
def _detect_level(self, message: str) -> str:
"""Détecte le niveau de log à partir du message."""
for pattern, level in self._level_patterns:
if pattern.search(message):
return level
return 'INFO'
def add_log(self, message: str, level: Optional[str] = None, source: str = "console"):
"""Ajoute un log au buffer."""
if not message or not message.strip():
return
message = message.strip()
if not level:
level = self._detect_level(message)
with self._lock:
# Éviter les doublons consécutifs (même message dans les 2 dernières entrées)
if len(self._logs) > 0:
recent = list(self._logs)[-2:] if len(self._logs) >= 2 else list(self._logs)
for recent_log in recent:
if recent_log.message == message and recent_log.source == source:
return # Doublon, ignorer
self._id_counter += 1
entry = ConsoleLogEntry(
id=self._id_counter,
timestamp=datetime.now(timezone.utc).isoformat(),
level=level,
message=message,
source=source
)
self._logs.append(entry)
def get_logs(self, limit: int = 500, offset: int = 0, level: Optional[str] = None) -> List[dict]:
"""Récupère les logs avec pagination."""
with self._lock:
logs = list(self._logs)
# Filtrer par niveau si spécifié
if level:
logs = [l for l in logs if l.level.upper() == level.upper()]
# Trier par ID décroissant (plus récent en premier)
logs = sorted(logs, key=lambda x: x.id, reverse=True)
# Pagination
start = offset
end = offset + limit
paginated = logs[start:end]
return [l.to_dict() for l in paginated]
def get_count(self) -> int:
"""Retourne le nombre total de logs."""
with self._lock:
return len(self._logs)
def clear(self):
"""Vide le buffer de logs."""
with self._lock:
self._logs.clear()
def start_capture(self):
"""Démarre la capture des logs stdout/stderr et uvicorn."""
if self._capturing:
return
self._capturing = True
log_service = self
# Wrapper pour stdout
class StdoutWrapper:
def __init__(wrapper_self, original):
wrapper_self._original = original
wrapper_self._buffer = ""
def write(wrapper_self, text):
wrapper_self._original.write(text)
wrapper_self._original.flush()
# Accumuler et traiter les lignes complètes
wrapper_self._buffer += text
while '\n' in wrapper_self._buffer:
line, wrapper_self._buffer = wrapper_self._buffer.split('\n', 1)
if line.strip():
log_service.add_log(line, source="stdout")
return len(text)
def flush(wrapper_self):
wrapper_self._original.flush()
def __getattr__(wrapper_self, name):
return getattr(wrapper_self._original, name)
# Wrapper pour stderr
class StderrWrapper:
def __init__(wrapper_self, original):
wrapper_self._original = original
wrapper_self._buffer = ""
def write(wrapper_self, text):
wrapper_self._original.write(text)
wrapper_self._original.flush()
wrapper_self._buffer += text
while '\n' in wrapper_self._buffer:
line, wrapper_self._buffer = wrapper_self._buffer.split('\n', 1)
if line.strip():
log_service.add_log(line, source="stderr")
return len(text)
def flush(wrapper_self):
wrapper_self._original.flush()
def __getattr__(wrapper_self, name):
return getattr(wrapper_self._original, name)
sys.stdout = StdoutWrapper(self._original_stdout)
sys.stderr = StderrWrapper(self._original_stderr)
# Handler pour capturer les logs uvicorn/logging
class LogCaptureHandler(logging.Handler):
def emit(handler_self, record):
try:
msg = handler_self.format(record)
level_map = {
logging.DEBUG: 'DEBUG',
logging.INFO: 'INFO',
logging.WARNING: 'WARN',
logging.ERROR: 'ERROR',
logging.CRITICAL: 'ERROR',
}
level = level_map.get(record.levelno, 'INFO')
log_service.add_log(msg, level=level, source=record.name)
except Exception:
pass
# Ajouter le handler aux loggers uvicorn
self._log_handler = LogCaptureHandler()
self._log_handler.setFormatter(logging.Formatter('%(message)s'))
for logger_name in ['uvicorn', 'uvicorn.access', 'uvicorn.error']:
logger = logging.getLogger(logger_name)
logger.addHandler(self._log_handler)
def stop_capture(self):
"""Arrête la capture des logs."""
if not self._capturing:
return
self._capturing = False
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
# Retirer le handler des loggers uvicorn
if hasattr(self, '_log_handler'):
for logger_name in ['uvicorn', 'uvicorn.access', 'uvicorn.error']:
logger = logging.getLogger(logger_name)
logger.removeHandler(self._log_handler)
# Instance globale
console_log_service = ConsoleLogCapture()