From a6f32dd4d8d5a96d348d0699054a55bb6344a64d Mon Sep 17 00:00:00 2001 From: AdrianoDev Date: Sat, 9 May 2026 19:57:51 +0200 Subject: [PATCH] feat(llm): cost tracker with per-tier pricing and breakdown Co-Authored-By: Claude Opus 4.7 (1M context) --- src/multi_swarm/llm/cost_tracker.py | 73 +++++++++++++++++++++++++++++ tests/unit/test_cost_tracker.py | 32 +++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 src/multi_swarm/llm/cost_tracker.py create mode 100644 tests/unit/test_cost_tracker.py diff --git a/src/multi_swarm/llm/cost_tracker.py b/src/multi_swarm/llm/cost_tracker.py new file mode 100644 index 0000000..3b8c567 --- /dev/null +++ b/src/multi_swarm/llm/cost_tracker.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +from ..genome.hypothesis import ModelTier + +PRICE_PER_M_TOKENS: dict[ModelTier, dict[str, float]] = { + ModelTier.C: {"input": 0.40, "output": 0.40}, + ModelTier.B: {"input": 3.00, "output": 15.00}, +} + + +def estimate_cost(input_tokens: int, output_tokens: int, tier: ModelTier) -> float: + p = PRICE_PER_M_TOKENS[tier] + return (input_tokens / 1_000_000) * p["input"] + (output_tokens / 1_000_000) * p["output"] + + +@dataclass +class CostRecord: + ts: datetime + run_id: str + agent_id: str + tier: ModelTier + input_tokens: int + output_tokens: int + cost_usd: float + + +@dataclass +class CostTracker: + records: list[CostRecord] = field(default_factory=list) + + def record( + self, + input_tokens: int, + output_tokens: int, + tier: ModelTier, + run_id: str, + agent_id: str, + ) -> CostRecord: + cost = estimate_cost(input_tokens, output_tokens, tier) + rec = CostRecord( + ts=datetime.now(UTC), + run_id=run_id, + agent_id=agent_id, + tier=tier, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + ) + self.records.append(rec) + return rec + + def summary(self) -> dict[str, Any]: + by_tier: dict[str, dict[str, float]] = defaultdict( + lambda: {"calls": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0} + ) + for r in self.records: + t = r.tier.value + by_tier[t]["calls"] += 1 + by_tier[t]["input_tokens"] += r.input_tokens + by_tier[t]["output_tokens"] += r.output_tokens + by_tier[t]["cost_usd"] += r.cost_usd + return { + "calls": len(self.records), + "input_tokens": sum(r.input_tokens for r in self.records), + "output_tokens": sum(r.output_tokens for r in self.records), + "cost_usd": sum(r.cost_usd for r in self.records), + "by_tier": dict(by_tier), + } diff --git a/tests/unit/test_cost_tracker.py b/tests/unit/test_cost_tracker.py new file mode 100644 index 0000000..08a417c --- /dev/null +++ b/tests/unit/test_cost_tracker.py @@ -0,0 +1,32 @@ +from multi_swarm.genome.hypothesis import ModelTier +from multi_swarm.llm.cost_tracker import CostTracker, estimate_cost + + +def test_estimate_cost_tier_c(): + cost = estimate_cost(input_tokens=1_000_000, output_tokens=1_000_000, tier=ModelTier.C) + assert cost == 0.40 + 0.40 + + +def test_estimate_cost_tier_b(): + cost = estimate_cost(input_tokens=1_000_000, output_tokens=1_000_000, tier=ModelTier.B) + assert cost == 3.00 + 15.00 + + +def test_tracker_accumulates(): + t = CostTracker() + t.record(input_tokens=10_000, output_tokens=20_000, tier=ModelTier.C, run_id="r", agent_id="a") + t.record(input_tokens=5_000, output_tokens=15_000, tier=ModelTier.C, run_id="r", agent_id="b") + summary = t.summary() + assert summary["calls"] == 2 + assert summary["input_tokens"] == 15_000 + assert summary["output_tokens"] == 35_000 + assert summary["cost_usd"] > 0 + + +def test_tracker_per_tier_breakdown(): + t = CostTracker() + t.record(input_tokens=10_000, output_tokens=10_000, tier=ModelTier.C, run_id="r", agent_id="a") + t.record(input_tokens=10_000, output_tokens=10_000, tier=ModelTier.B, run_id="r", agent_id="b") + summary = t.summary() + assert "C" in summary["by_tier"] + assert "B" in summary["by_tier"]