from __future__ import annotations import asyncio from datetime import datetime, timezone from fastapi.responses import JSONResponse from fastapi.testclient import TestClient import app_runtime.core.runtime as runtime_module from app_runtime.control.base import ControlActionRequest, TraceQueryRequest from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.contracts.trace import TraceLogRecord, TraceLogView from app_runtime.core.runtime import RuntimeManager from app_runtime.tracing.reader import MySqlTraceLogReader def _trace_record( *, row_id: int, level: str, message: str, step: str = "process", status: str = "failed", attrs_json: object | None = None, ) -> TraceLogRecord: return TraceLogRecord( id=row_id, trace_id="trace-1", event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc), step=step, status=status, level=level, # type: ignore[arg-type] message=message, attrs_json=attrs_json if attrs_json is not None else {}, ) def _build_client(trace_provider=None) -> TestClient: async def health_provider(): return {"status": "ok"} async def action_provider(_action: str, _client_source: str, _request: ControlActionRequest) -> JSONResponse: return JSONResponse(content={"status": "ok"}) app = HttpControlAppFactory().create(health_provider, action_provider, trace_provider) return TestClient(app) def test_trace_endpoint_returns_text_with_default_levels() -> None: captured: list[tuple[str, TraceQueryRequest]] = [] async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView: captured.append((trace_id, request)) return TraceLogView( trace_id="trace-1", parent_id="root-trace", child_ids=("child-1", "child-2"), records=( _trace_record(row_id=1, level="ERROR", message="first error"), _trace_record(row_id=2, level="WARNING", message="second warning"), ), ) client = _build_client(trace_provider) try: response = client.get("/traces/trace-1") finally: client.close() assert response.status_code == 200 assert response.text == ( "trace_id: trace-1\n" "parent_id: root-trace\n" "child_ids:\n" " - child-1\n" " - child-2\n" "--------------------------------------------------\n" "step: process\n" "first error\n" "second warning" ) assert captured == [("trace-1", TraceQueryRequest(levels=("ERROR", "WARNING", "INFO"), include_attrs_json=False, response_format="text"))] def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None: async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView: return TraceLogView( trace_id="trace-1", parent_id=None, child_ids=(), records=( _trace_record(row_id=1, level="ERROR", message="failure", attrs_json={"attempt": 2, "source": "crm"}), ), ) client = _build_client(trace_provider) try: response = client.get("/traces/trace-1?attrs_json=true") finally: client.close() assert response.status_code == 200 assert response.text == ( "trace_id: trace-1\n" "parent_id: \n" "child_ids:\n" "--------------------------------------------------\n" "step: process\n" 'failure, {"attempt":2,"source":"crm"}' ) def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None: async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView: return TraceLogView( trace_id="trace-1", parent_id=None, child_ids=(), records=( _trace_record(row_id=1, level="INFO", message="load first", step="load_stocks"), _trace_record(row_id=2, level="INFO", message="load second", step="load_stocks"), _trace_record(row_id=3, level="INFO", message="filter first", step="filter_stocks"), ), ) client = _build_client(trace_provider) try: response = client.get("/traces/trace-1") finally: client.close() assert response.status_code == 200 assert response.text == ( "trace_id: trace-1\n" "parent_id: \n" "child_ids:\n" "--------------------------------------------------\n" "step: load_stocks\n" "load first\n" "load second\n" "--------------------------------------------------\n" "step: filter_stocks\n" "filter first" ) def test_trace_endpoint_returns_json_payload() -> None: async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView: return TraceLogView( trace_id="trace-1", parent_id="parent-1", child_ids=("child-1",), records=( _trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}), ), ) client = _build_client(trace_provider) try: response = client.get("/traces/trace-1?format=json&attrs_json=true&levels=info") finally: client.close() assert response.status_code == 200 assert response.json() == { "trace_id": "trace-1", "parent_id": "parent-1", "child_ids": ["child-1"], "messages": [ { "id": 3, "trace_id": "trace-1", "event_time": "2026-04-28T10:11:12+00:00", "step": "process", "status": "failed", "level": "INFO", "message": "done", "attrs_json": {"batch": 7}, } ], } def test_trace_endpoint_returns_html_page_with_related_links() -> None: async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView: return TraceLogView( trace_id="trace-1", parent_id="parent-1", child_ids=("child-1", "child-2"), records=( _trace_record(row_id=1, level="INFO", message="loaded prices", step="load_stocks", status="ok"), _trace_record(row_id=2, level="WARNING", message="filtered suspicious ticker", step="filter_stocks", status="degraded"), ), ) client = _build_client(trace_provider) try: response = client.get("/traces/trace-1?format=html&attrs_json=true") finally: client.close() assert response.status_code == 200 assert response.headers["content-type"].startswith("text/html") assert "background: var(--bg);" in response.text assert "--bg: #000000;" in response.text assert "--fg: #ececec;" in response.text assert "--step: #ffffff;" in response.text assert "--info: #d6d7d9;" in response.text assert "--warning: #e9ebec;" in response.text assert "--error: #ff817d;" in response.text assert "--other: #ececec;" in response.text assert 'font: 13px/1.1 "SFMono-Regular", monospace;' in response.text assert '
trace_id: trace-1
' in response.text assert '
parent_id: parent-1
' in response.text assert '
child_ids:
' in response.text assert '
- child-1
' in response.text assert '
- child-2
' in response.text assert '
load_stocks
' in response.text assert '
--------------------------------------------------
' in response.text assert '
filter_stocks
' in response.text assert "loaded prices" in response.text assert "filtered suspicious ticker" in response.text assert "2026-04-28T10:11:12+00:00 | INFO | ok" not in response.text assert "2026-04-28T10:11:12+00:00 | WARNING | degraded" not in response.text assert "Related Traces" not in response.text def test_trace_endpoint_validates_query_params() -> None: client = _build_client(lambda _trace_id, _request: None) try: invalid_level = client.get("/traces/trace-1?levels=error,fatal") invalid_format = client.get("/traces/trace-1?format=xml") finally: client.close() assert invalid_level.status_code == 400 assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"} assert invalid_format.status_code == 400 assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"} def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None: expected = TraceLogView( trace_id="trace-1", parent_id="root", child_ids=("child-1",), records=(_trace_record(row_id=1, level="ERROR", message="boom"),), ) class StubReader: def read_trace(self, trace_id: str, levels: tuple[str, ...]) -> TraceLogView | None: assert trace_id == "trace-1" assert levels == ("ERROR",) return expected monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader()) runtime = RuntimeManager() result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",)))) assert result == expected def test_mysql_trace_log_reader_maps_db_rows() -> None: class FakeCursor: def __init__(self) -> None: self.executed: list[tuple[str, tuple[object, ...]]] = [] self._current_query = "" def execute(self, query: str, params: tuple[object, ...]) -> None: self.executed.append((query, params)) self._current_query = query def fetchone(self) -> dict[str, object] | None: return {"parent_id": "root-77"} def fetchall(self) -> list[dict[str, object]]: if "WHERE parent_id = %s" in self._current_query: return [{"trace_id": "child-1"}, {"trace_id": "child-2"}] return [ { "id": 8, "trace_id": "trace-1", "event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc), "step": "parse", "status": "failed", "level": "ERROR", "message": "broken", "attrs_json": '{"attempt":1}', } ] def __enter__(self) -> FakeCursor: return self def __exit__(self, exc_type, exc, tb) -> None: return None class FakeConnection: def __init__(self, cursor: FakeCursor) -> None: self._cursor = cursor def cursor(self) -> FakeCursor: return self._cursor def __enter__(self) -> FakeConnection: return self def __exit__(self, exc_type, exc, tb) -> None: return None class FakeConnectionFactory: def __init__(self) -> None: self.cursor = FakeCursor() def connect(self) -> FakeConnection: return FakeConnection(self.cursor) factory = FakeConnectionFactory() reader = MySqlTraceLogReader(factory) # type: ignore[arg-type] view = reader.read_trace("trace-1", ("ERROR", "WARNING")) assert view == TraceLogView( trace_id="trace-1", parent_id="root-77", child_ids=("child-1", "child-2"), records=( TraceLogRecord( id=8, trace_id="trace-1", event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc), step="parse", status="failed", level="ERROR", message="broken", attrs_json={"attempt": 1}, ), ), ) assert len(factory.cursor.executed) == 3 assert factory.cursor.executed[1][1] == ("trace-1",) assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")