Files
Cerbero-mcp/services/common/src/mcp_common/logging.py
T

82 lines
2.7 KiB
Python

from __future__ import annotations
import logging
import os
import re
import sys
# pythonjsonlogger rinominato in .json; keep fallback per compat
try:
from pythonjsonlogger.json import JsonFormatter as _JsonFormatter # noqa: N813
except ImportError:
from pythonjsonlogger.jsonlogger import JsonFormatter as _JsonFormatter # noqa: N813
SECRET_PATTERNS = [
(re.compile(r"Bearer\s+[\w\-\._]+", re.IGNORECASE), "Bearer ***"),
(re.compile(r'("api_key"\s*:\s*")[^"]+(")'), r'\1***\2'),
(re.compile(r'("password"\s*:\s*")[^"]+(")'), r'\1***\2'),
(re.compile(r'("private_key"\s*:\s*")[^"]+(")'), r'\1***\2'),
(re.compile(r'("client_secret"\s*:\s*")[^"]+(")'), r'\1***\2'),
(re.compile(r"sk-[\w]{20,}"), "sk-***"),
]
class SecretsFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
msg = record.getMessage()
for pattern, replacement in SECRET_PATTERNS:
msg = pattern.sub(replacement, msg)
record.msg = msg
record.args = () # already formatted into msg
return True
def get_json_logger(name: str, level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger(name)
if logger.handlers:
return logger # already configured
logger.setLevel(level)
handler = logging.StreamHandler(sys.stderr)
formatter = _JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
handler.setFormatter(formatter)
handler.addFilter(SecretsFilter())
logger.addHandler(handler)
logger.propagate = False
return logger
def configure_root_logging(
*,
level: str | int | None = None,
format_type: str | None = None,
) -> None:
"""CER-P5-009: configura il root logger con JSON o text formatter.
Env overrides:
- LOG_LEVEL (default INFO)
- LOG_FORMAT=json|text (default json — production-ready structured log)
Applica SecretsFilter su entrambi i format.
"""
lvl_raw = level if level is not None else os.environ.get("LOG_LEVEL", "INFO")
lvl = logging.getLevelName(lvl_raw.upper()) if isinstance(lvl_raw, str) else lvl_raw
fmt = (format_type or os.environ.get("LOG_FORMAT") or "json").lower()
root = logging.getLogger()
# Rimuovi handler esistenti (basicConfig li avrebbe lasciati duplicati)
for h in list(root.handlers):
root.removeHandler(h)
handler = logging.StreamHandler(sys.stderr)
if fmt == "json":
handler.setFormatter(
_JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
)
else:
handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
)
handler.addFilter(SecretsFilter())
root.addHandler(handler)
root.setLevel(lvl)