from __future__ import annotations import pytest from cerbero_mcp.exchanges.ibkr.key_rotation import KeyRotationManager @pytest.mark.asyncio async def test_start_generates_new_keypair_files(tmp_path): sig_path = tmp_path / "sig.pem" enc_path = tmp_path / "enc.pem" sig_path.write_bytes(b"old-sig") enc_path.write_bytes(b"old-enc") mgr = KeyRotationManager( signature_key_path=str(sig_path), encryption_key_path=str(enc_path), ) out = await mgr.start() assert "sig" in out["fingerprints"] assert "enc" in out["fingerprints"] assert (tmp_path / "sig.pem.new").exists() assert (tmp_path / "enc.pem.new").exists() @pytest.mark.asyncio async def test_confirm_swap_and_validate_ok(tmp_path): sig_path = tmp_path / "sig.pem" enc_path = tmp_path / "enc.pem" sig_path.write_bytes(b"old-sig") enc_path.write_bytes(b"old-enc") mgr = KeyRotationManager( signature_key_path=str(sig_path), encryption_key_path=str(enc_path), ) await mgr.start() async def fake_validate() -> bool: return True out = await mgr.confirm(validate=fake_validate) assert "rotated_at" in out assert (tmp_path / ".archive").exists() @pytest.mark.asyncio async def test_confirm_validate_fail_rollbacks(tmp_path): sig_path = tmp_path / "sig.pem" enc_path = tmp_path / "enc.pem" sig_path.write_bytes(b"old-sig") enc_path.write_bytes(b"old-enc") mgr = KeyRotationManager( signature_key_path=str(sig_path), encryption_key_path=str(enc_path), ) await mgr.start() async def fake_validate() -> bool: return False with pytest.raises(RuntimeError, match="IBKR_ROTATION_VALIDATION_FAILED"): await mgr.confirm(validate=fake_validate) assert sig_path.read_bytes() == b"old-sig" assert enc_path.read_bytes() == b"old-enc" @pytest.mark.asyncio async def test_abort_cleans_new_files(tmp_path): sig_path = tmp_path / "sig.pem" enc_path = tmp_path / "enc.pem" sig_path.write_bytes(b"old-sig") enc_path.write_bytes(b"old-enc") mgr = KeyRotationManager( signature_key_path=str(sig_path), encryption_key_path=str(enc_path), ) await mgr.start() await mgr.abort() assert not (tmp_path / "sig.pem.new").exists() assert not (tmp_path / "enc.pem.new").exists()