"""Endpoint admin: query audit log con filtri.""" from __future__ import annotations import json import os from datetime import datetime from pathlib import Path from typing import Any, Literal from fastapi import APIRouter, HTTPException, Query, Request from pydantic import BaseModel, SecretStr from cerbero_mcp.exchanges.ibkr.key_rotation import KeyRotationManager MAX_RECORDS = 10000 DEFAULT_LIMIT = 1000 class _IBKRRotateConfirmReq(BaseModel): new_consumer_key: str new_access_token: str new_access_token_secret: str def _parse_iso(value: str | None) -> datetime | None: if not value: return None try: # supporta sia "2026-05-01" sia "2026-05-01T12:34:56Z" return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError as e: raise HTTPException(400, f"invalid datetime: {value}") from e def _record_timestamp(rec: dict[str, Any]) -> datetime | None: """Estrae il timestamp da un record audit. JsonFormatter mette 'asctime' in formato '2026-05-01 12:34:56,789'. Lo parsiamo come UTC. """ ts = rec.get("asctime") or rec.get("timestamp") if not ts: return None try: # asctime format default: 'YYYY-MM-DD HH:MM:SS,mmm' ts_clean = ts.replace(",", ".") return datetime.fromisoformat(ts_clean) except ValueError: return None def _matches_filters( rec: dict[str, Any], *, from_dt: datetime | None, to_dt: datetime | None, actor: str | None, exchange: str | None, action: str | None, bot_tag: str | None, ) -> bool: if rec.get("audit_event") != "write_op": return False if actor is not None and rec.get("actor") != actor: return False if exchange is not None and rec.get("exchange") != exchange: return False if action is not None and rec.get("action") != action: return False if bot_tag is not None and rec.get("bot_tag") != bot_tag: return False if from_dt is not None or to_dt is not None: rec_ts = _record_timestamp(rec) if rec_ts is None: return False if from_dt is not None and rec_ts < from_dt: return False if to_dt is not None and rec_ts > to_dt: return False return True def _read_audit_records(file_path: Path) -> list[dict[str, Any]]: if not file_path.exists(): return [] out: list[dict[str, Any]] = [] with file_path.open("r", encoding="utf-8") as f: for line in f: stripped = line.strip() if not stripped: continue try: out.append(json.loads(stripped)) except json.JSONDecodeError: continue return out def make_admin_router() -> APIRouter: r = APIRouter(prefix="/admin", tags=["admin"]) @r.get("/audit") async def query_audit( request: Request, from_: str | None = Query(None, alias="from"), to: str | None = Query(None), actor: Literal["testnet", "mainnet"] | None = Query(None), exchange: str | None = Query(None), action: str | None = Query(None), bot_tag: str | None = Query(None), limit: int = Query(DEFAULT_LIMIT, ge=1, le=MAX_RECORDS), ) -> dict[str, Any]: """Restituisce i record audit_write_op filtrati. Param query (tutti opzionali): - from / to: ISO 8601 datetime (es. 2026-05-01 oppure 2026-05-01T12:34:56) - actor: testnet | mainnet - exchange: deribit | bybit | hyperliquid | alpaca - action: nome del tool (es. place_order) - bot_tag: identificatore bot - limit: max record da ritornare (default 1000, max 10000) Source: AUDIT_LOG_FILE (env var). Se non settata, ritorna lista vuota con warning. """ from_dt = _parse_iso(from_) to_dt = _parse_iso(to) file_str = os.environ.get("AUDIT_LOG_FILE", "").strip() if not file_str: return { "records": [], "count": 0, "warning": "AUDIT_LOG_FILE not configured; no persistent audit log to query", "from": from_, "to": to, } file_path = Path(file_str) all_records = _read_audit_records(file_path) filtered = [ rec for rec in all_records if _matches_filters( rec, from_dt=from_dt, to_dt=to_dt, actor=actor, exchange=exchange, action=action, bot_tag=bot_tag, ) ] # sort desc per timestamp (ultimi prima) + limit filtered.sort( key=lambda rec: _record_timestamp(rec) or datetime.min, reverse=True, ) if len(filtered) > limit: filtered = filtered[:limit] return { "records": filtered, "count": len(filtered), "from": from_, "to": to, "filters": { "actor": actor, "exchange": exchange, "action": action, "bot_tag": bot_tag, }, } @r.post("/ibkr/rotate-keys/start") async def _ibkr_rotate_start(env: str, request: Request): if env not in ("testnet", "mainnet"): raise HTTPException(400, detail={"error": "invalid env"}) settings = request.app.state.settings creds = settings.ibkr.credentials(env) mgr = KeyRotationManager( signature_key_path=creds["signature_key_path"], encryption_key_path=creds["encryption_key_path"], ) rotations = getattr(request.app.state, "ibkr_rotations", None) if rotations is None: rotations = {} request.app.state.ibkr_rotations = rotations rotations[env] = mgr return await mgr.start() @r.post("/ibkr/rotate-keys/confirm") async def _ibkr_rotate_confirm( env: str, body: _IBKRRotateConfirmReq, request: Request, ): if env not in ("testnet", "mainnet"): raise HTTPException(400, detail={"error": "invalid env"}) rotations = getattr(request.app.state, "ibkr_rotations", {}) or {} mgr = rotations.get(env) if mgr is None: raise HTTPException(409, detail={"error": "rotation not started"}) settings = request.app.state.settings if env == "testnet": settings.ibkr.consumer_key_testnet = body.new_consumer_key settings.ibkr.access_token_testnet = body.new_access_token settings.ibkr.access_token_secret_testnet = SecretStr(body.new_access_token_secret) else: settings.ibkr.consumer_key_live = body.new_consumer_key settings.ibkr.access_token_live = body.new_access_token settings.ibkr.access_token_secret_live = SecretStr(body.new_access_token_secret) registry = request.app.state.registry registry._clients.pop(("ibkr", env), None) async def _validate() -> bool: try: client = await registry.get("ibkr", env) await client._request("GET", "/iserver/auth/status", skip_tickle=True) return True except Exception: return False try: return await mgr.confirm(validate=_validate) finally: rotations.pop(env, None) @r.post("/ibkr/rotate-keys/abort") async def _ibkr_rotate_abort(env: str, request: Request): rotations = getattr(request.app.state, "ibkr_rotations", {}) or {} mgr = rotations.pop(env, None) if mgr is None: return {"aborted": False, "reason": "no rotation in progress"} return await mgr.abort() @r.post("/ibkr/health") async def _ibkr_health(request: Request): registry = request.app.state.registry out: dict[str, Any] = {} for env in ("testnet", "mainnet"): try: client = await registry.get("ibkr", env) status = await client._request( "GET", "/iserver/auth/status", skip_tickle=True ) out[env] = {"healthy": True, "status": status} except Exception as e: out[env] = {"healthy": False, "error": str(e)[:200]} return out return r