728 lines
26 KiB
Python
728 lines
26 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.testclient import TestClient
|
|
|
|
import app_runtime.core.runtime as runtime_module
|
|
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
|
|
from app_runtime.control.http_app import HttpControlAppFactory
|
|
from app_runtime.contracts.trace import TraceLogRecord, TraceLogView
|
|
from app_runtime.core.runtime import RuntimeManager
|
|
from app_runtime.tracing.reader import MySqlTraceLogReader
|
|
|
|
|
|
def _trace_record(
|
|
*,
|
|
row_id: int,
|
|
level: str,
|
|
message: str,
|
|
step: str = "process",
|
|
status: str = "failed",
|
|
attrs_json: object | None = None,
|
|
) -> TraceLogRecord:
|
|
return TraceLogRecord(
|
|
id=row_id,
|
|
trace_id="trace-1",
|
|
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
|
step=step,
|
|
status=status,
|
|
level=level, # type: ignore[arg-type]
|
|
message=message,
|
|
attrs_json=attrs_json if attrs_json is not None else {},
|
|
)
|
|
|
|
|
|
def _build_client(trace_provider=None) -> TestClient:
|
|
async def health_provider():
|
|
return {"status": "ok"}
|
|
|
|
async def action_provider(_action: str, _client_source: str, _request: ControlActionRequest) -> JSONResponse:
|
|
return JSONResponse(content={"status": "ok"})
|
|
|
|
app = HttpControlAppFactory().create(health_provider, action_provider, trace_provider)
|
|
return TestClient(app)
|
|
|
|
|
|
def test_trace_endpoint_returns_html_by_default() -> None:
|
|
captured: list[tuple[str, TraceQueryRequest]] = []
|
|
|
|
async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
|
captured.append((trace_id, request))
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="root-trace",
|
|
child_ids=("child-1", "child-2"),
|
|
records=(
|
|
_trace_record(row_id=1, level="ERROR", message="first error"),
|
|
_trace_record(row_id=2, level="WARNING", message="second warning"),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.headers["content-type"].startswith("text/html")
|
|
assert "trace_id:" in response.text
|
|
assert "first error" in response.text
|
|
assert "second warning" in response.text
|
|
assert captured == [
|
|
(
|
|
"trace-1",
|
|
TraceQueryRequest(
|
|
levels=("ERROR", "WARNING", "INFO"),
|
|
include_attrs_json=False,
|
|
response_format="html",
|
|
ancestor_depth=0,
|
|
),
|
|
)
|
|
]
|
|
|
|
|
|
def test_trace_endpoint_returns_text_when_requested() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="root-trace",
|
|
child_ids=("child-1", "child-2"),
|
|
records=(
|
|
_trace_record(row_id=1, level="ERROR", message="first error"),
|
|
_trace_record(row_id=2, level="WARNING", message="second warning"),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=text")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.text == (
|
|
"trace_id: trace-1\n"
|
|
"parent_id: root-trace\n"
|
|
"child_ids:\n"
|
|
" - child-1\n"
|
|
" - child-2\n"
|
|
"------------------------------\n"
|
|
"step: process\n"
|
|
"first error\n"
|
|
"second warning"
|
|
)
|
|
|
|
|
|
def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id=None,
|
|
child_ids=(),
|
|
records=(
|
|
_trace_record(row_id=1, level="ERROR", message="failure", attrs_json={"attempt": 2, "source": "crm"}),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=text&attrs_json=true")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.text == (
|
|
"trace_id: trace-1\n"
|
|
"parent_id: \n"
|
|
"child_ids:\n"
|
|
"------------------------------\n"
|
|
"step: process\n"
|
|
'failure, {"attempt":2,"source":"crm"}'
|
|
)
|
|
|
|
|
|
def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id=None,
|
|
child_ids=(),
|
|
records=(
|
|
_trace_record(row_id=1, level="INFO", message="load first", step="load_stocks"),
|
|
_trace_record(row_id=2, level="INFO", message="load second", step="load_stocks"),
|
|
_trace_record(row_id=3, level="INFO", message="filter first", step="filter_stocks"),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=text")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.text == (
|
|
"trace_id: trace-1\n"
|
|
"parent_id: \n"
|
|
"child_ids:\n"
|
|
"------------------------------\n"
|
|
"step: load_stocks\n"
|
|
"load first\n"
|
|
"load second\n"
|
|
"------------------------------\n"
|
|
"step: filter_stocks\n"
|
|
"filter first"
|
|
)
|
|
|
|
|
|
def test_trace_endpoint_returns_json_payload() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="parent-1",
|
|
child_ids=("child-1",),
|
|
records=(
|
|
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=json&attrs_json=true&levels=info")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {
|
|
"trace_id": "trace-1",
|
|
"parent_id": "parent-1",
|
|
"child_ids": ["child-1"],
|
|
"messages": [
|
|
{
|
|
"id": 3,
|
|
"trace_id": "trace-1",
|
|
"event_time": "2026-04-28T10:11:12+00:00",
|
|
"step": "process",
|
|
"status": "failed",
|
|
"level": "INFO",
|
|
"message": "done",
|
|
"attrs_json": {"batch": 7},
|
|
}
|
|
],
|
|
"ancestors": [],
|
|
}
|
|
|
|
|
|
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="parent-1",
|
|
child_ids=("child-1",),
|
|
records=(
|
|
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
|
),
|
|
ancestors=(
|
|
TraceLogView(
|
|
trace_id="root-1",
|
|
parent_id=None,
|
|
child_ids=("parent-1",),
|
|
records=(
|
|
_trace_record(row_id=4, level="INFO", message="root info"),
|
|
),
|
|
),
|
|
TraceLogView(
|
|
trace_id="parent-1",
|
|
parent_id="root-1",
|
|
child_ids=("trace-1", "sibling-1"),
|
|
records=(
|
|
_trace_record(row_id=5, level="WARNING", message="parent warning"),
|
|
),
|
|
),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["ancestors"] == [
|
|
{
|
|
"trace_id": "root-1",
|
|
"parent_id": "",
|
|
"child_ids": ["parent-1"],
|
|
"messages": [
|
|
{
|
|
"id": 4,
|
|
"trace_id": "trace-1",
|
|
"event_time": "2026-04-28T10:11:12+00:00",
|
|
"step": "process",
|
|
"status": "failed",
|
|
"level": "INFO",
|
|
"message": "root info",
|
|
}
|
|
],
|
|
},
|
|
{
|
|
"trace_id": "parent-1",
|
|
"parent_id": "root-1",
|
|
"child_ids": ["trace-1", "sibling-1"],
|
|
"messages": [
|
|
{
|
|
"id": 5,
|
|
"trace_id": "trace-1",
|
|
"event_time": "2026-04-28T10:11:12+00:00",
|
|
"step": "process",
|
|
"status": "failed",
|
|
"level": "WARNING",
|
|
"message": "parent warning",
|
|
}
|
|
],
|
|
}
|
|
]
|
|
|
|
|
|
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="parent-1",
|
|
child_ids=("child-1", "child-2"),
|
|
records=(
|
|
_trace_record(row_id=1, level="INFO", message="loaded prices", step="load_stocks", status="ok"),
|
|
_trace_record(row_id=2, level="WARNING", message="filtered suspicious ticker", step="filter_stocks", status="degraded"),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=html&attrs_json=true")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.headers["content-type"].startswith("text/html")
|
|
assert "background: var(--bg);" in response.text
|
|
assert "--bg: #000000;" in response.text
|
|
assert "--fg: #ececec;" in response.text
|
|
assert "--step: #ffffff;" in response.text
|
|
assert "--info: #d6d7d9;" in response.text
|
|
assert "--warning: #e9ebec;" in response.text
|
|
assert "--error: #ff817d;" in response.text
|
|
assert "--other: #ececec;" in response.text
|
|
assert 'font: 13px/1.1 "SFMono-Regular", monospace;' in response.text
|
|
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">trace-1</a></div>' in response.text
|
|
assert '<div class="line">parent_id: <a href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">parent-1</a></div>' in response.text
|
|
assert '<div class="line">child_ids:</div>' in response.text
|
|
assert '<div class="line"> - <a href="/traces/child-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-1</a></div>' in response.text
|
|
assert '<div class="line"> - <a href="/traces/child-2?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-2</a></div>' in response.text
|
|
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
|
|
assert '<div class="line">------------------------------</div>' in response.text
|
|
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
|
|
assert "loaded prices" in response.text
|
|
assert "filtered suspicious ticker" in response.text
|
|
assert "2026-04-28T10:11:12+00:00 | INFO | ok" not in response.text
|
|
assert "2026-04-28T10:11:12+00:00 | WARNING | degraded" not in response.text
|
|
assert "Related Traces" not in response.text
|
|
|
|
|
|
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="parent-1",
|
|
child_ids=(),
|
|
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
|
|
ancestors=(
|
|
TraceLogView(
|
|
trace_id="root-1",
|
|
parent_id=None,
|
|
child_ids=("parent-1",),
|
|
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
|
|
),
|
|
TraceLogView(
|
|
trace_id="parent-1",
|
|
parent_id="root-1",
|
|
child_ids=("trace-1",),
|
|
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
|
|
),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.text == (
|
|
"trace_id: root-1\n"
|
|
"parent_id: \n"
|
|
"child_ids:\n"
|
|
" - parent-1\n"
|
|
"------------------------------\n"
|
|
"step: process\n"
|
|
"root message\n"
|
|
"\n"
|
|
"trace_id: parent-1\n"
|
|
"parent_id: root-1\n"
|
|
"child_ids:\n"
|
|
" - trace-1\n"
|
|
"------------------------------\n"
|
|
"step: process\n"
|
|
"parent message\n"
|
|
"\n"
|
|
"trace_id: trace-1\n"
|
|
"parent_id: parent-1\n"
|
|
"child_ids:\n"
|
|
"------------------------------\n"
|
|
"step: process\n"
|
|
"child message"
|
|
)
|
|
|
|
|
|
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
|
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
|
return TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="parent-1",
|
|
child_ids=("child-1",),
|
|
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
|
|
ancestors=(
|
|
TraceLogView(
|
|
trace_id="root-1",
|
|
parent_id=None,
|
|
child_ids=("parent-1",),
|
|
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
|
|
),
|
|
TraceLogView(
|
|
trace_id="parent-1",
|
|
parent_id="root-1",
|
|
child_ids=("trace-1",),
|
|
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
|
|
),
|
|
),
|
|
)
|
|
|
|
client = _build_client(trace_provider)
|
|
try:
|
|
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
|
|
finally:
|
|
client.close()
|
|
|
|
assert response.status_code == 200
|
|
assert response.text.index('href="/traces/root-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"') < response.text.index('href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"') < response.text.index('href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"')
|
|
assert 'href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
|
assert 'href="/traces/root-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
|
assert 'href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
|
assert "root info" in response.text
|
|
assert "parent warning" in response.text
|
|
|
|
|
|
def test_trace_endpoint_validates_query_params() -> None:
|
|
client = _build_client(lambda _trace_id, _request: None)
|
|
try:
|
|
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
|
|
invalid_format = client.get("/traces/trace-1?format=xml")
|
|
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
|
|
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
|
|
finally:
|
|
client.close()
|
|
|
|
assert invalid_level.status_code == 400
|
|
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
|
|
assert invalid_format.status_code == 400
|
|
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
|
|
assert invalid_ancestor_depth.status_code == 400
|
|
assert invalid_ancestor_depth.json() == {
|
|
"status": "error",
|
|
"detail": "query parameter must be >= 0: ancestor_depth=-1",
|
|
}
|
|
assert invalid_ancestor_type.status_code == 400
|
|
assert invalid_ancestor_type.json() == {
|
|
"status": "error",
|
|
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
|
|
}
|
|
|
|
|
|
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
|
expected = TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="root",
|
|
child_ids=("child-1",),
|
|
records=(_trace_record(row_id=1, level="ERROR", message="boom"),),
|
|
)
|
|
|
|
class StubReader:
|
|
def read_trace(
|
|
self,
|
|
trace_id: str,
|
|
levels: tuple[str, ...],
|
|
ancestor_depth: int | None = 0,
|
|
) -> TraceLogView | None:
|
|
assert trace_id == "trace-1"
|
|
assert levels == ("ERROR",)
|
|
assert ancestor_depth is None
|
|
return expected
|
|
|
|
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
|
|
runtime = RuntimeManager()
|
|
|
|
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
|
|
|
|
assert result == expected
|
|
|
|
|
|
def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
|
class FakeCursor:
|
|
def __init__(self) -> None:
|
|
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
|
self._current_query = ""
|
|
|
|
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
|
self.executed.append((query, params))
|
|
self._current_query = query
|
|
|
|
def fetchone(self) -> dict[str, object] | None:
|
|
if self.executed[-1][1] == ("trace-1",):
|
|
return {"parent_id": "root-77"}
|
|
if self.executed[-1][1] == ("root-77",):
|
|
return {"parent_id": None}
|
|
return None
|
|
|
|
def fetchall(self) -> list[dict[str, object]]:
|
|
if "WHERE parent_id = %s" in self._current_query:
|
|
return [{"trace_id": "child-1"}, {"trace_id": "child-2"}]
|
|
return [
|
|
{
|
|
"id": 8,
|
|
"trace_id": "trace-1",
|
|
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
|
"step": "parse",
|
|
"status": "failed",
|
|
"level": "ERROR",
|
|
"message": "broken",
|
|
"attrs_json": '{"attempt":1}',
|
|
}
|
|
]
|
|
|
|
def __enter__(self) -> FakeCursor:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnection:
|
|
def __init__(self, cursor: FakeCursor) -> None:
|
|
self._cursor = cursor
|
|
|
|
def cursor(self) -> FakeCursor:
|
|
return self._cursor
|
|
|
|
def __enter__(self) -> FakeConnection:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnectionFactory:
|
|
def __init__(self) -> None:
|
|
self.cursor = FakeCursor()
|
|
|
|
def connect(self) -> FakeConnection:
|
|
return FakeConnection(self.cursor)
|
|
|
|
factory = FakeConnectionFactory()
|
|
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
|
|
|
view = reader.read_trace("trace-1", ("ERROR", "WARNING"))
|
|
|
|
assert view == TraceLogView(
|
|
trace_id="trace-1",
|
|
parent_id="root-77",
|
|
child_ids=("child-1", "child-2"),
|
|
records=(
|
|
TraceLogRecord(
|
|
id=8,
|
|
trace_id="trace-1",
|
|
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
|
step="parse",
|
|
status="failed",
|
|
level="ERROR",
|
|
message="broken",
|
|
attrs_json={"attempt": 1},
|
|
),
|
|
),
|
|
ancestors=(),
|
|
)
|
|
assert len(factory.cursor.executed) == 3
|
|
assert factory.cursor.executed[1][1] == ("trace-1",)
|
|
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
|
|
|
|
|
|
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
|
|
class FakeCursor:
|
|
def __init__(self) -> None:
|
|
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
|
self._current_query = ""
|
|
|
|
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
|
self.executed.append((query, params))
|
|
self._current_query = query
|
|
|
|
def fetchone(self) -> dict[str, object] | None:
|
|
if self.executed[-1][1] == ("trace-1",):
|
|
return {"parent_id": "parent-1"}
|
|
if self.executed[-1][1] == ("parent-1",):
|
|
return {"parent_id": "root-1"}
|
|
if self.executed[-1][1] == ("root-1",):
|
|
return {"parent_id": None}
|
|
return None
|
|
|
|
def fetchall(self) -> list[dict[str, object]]:
|
|
if "WHERE parent_id = %s" in self._current_query:
|
|
parent_id = self.executed[-1][1][0]
|
|
if parent_id == "trace-1":
|
|
return []
|
|
if parent_id == "parent-1":
|
|
return [{"trace_id": "trace-1"}]
|
|
if parent_id == "root-1":
|
|
return [{"trace_id": "parent-1"}]
|
|
return []
|
|
trace_id = self.executed[-1][1][0]
|
|
return [
|
|
{
|
|
"id": 8 if trace_id == "trace-1" else 9,
|
|
"trace_id": trace_id,
|
|
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
|
"step": "parse",
|
|
"status": "failed",
|
|
"level": "ERROR",
|
|
"message": f"broken:{trace_id}",
|
|
"attrs_json": '{"attempt":1}',
|
|
}
|
|
]
|
|
|
|
def __enter__(self) -> FakeCursor:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnection:
|
|
def __init__(self, cursor: FakeCursor) -> None:
|
|
self._cursor = cursor
|
|
|
|
def cursor(self) -> FakeCursor:
|
|
return self._cursor
|
|
|
|
def __enter__(self) -> FakeConnection:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnectionFactory:
|
|
def __init__(self) -> None:
|
|
self.cursor = FakeCursor()
|
|
|
|
def connect(self) -> FakeConnection:
|
|
return FakeConnection(self.cursor)
|
|
|
|
factory = FakeConnectionFactory()
|
|
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
|
|
|
view = reader.read_trace("trace-1", ("ERROR",), 1)
|
|
|
|
assert view is not None
|
|
assert view.trace_id == "trace-1"
|
|
assert view.parent_id == "parent-1"
|
|
assert len(view.ancestors) == 1
|
|
assert view.ancestors[0].trace_id == "parent-1"
|
|
assert view.ancestors[0].parent_id == "root-1"
|
|
assert view.ancestors[0].child_ids == ("trace-1",)
|
|
|
|
|
|
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
|
|
class FakeCursor:
|
|
def __init__(self) -> None:
|
|
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
|
self._current_query = ""
|
|
|
|
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
|
self.executed.append((query, params))
|
|
self._current_query = query
|
|
|
|
def fetchone(self) -> dict[str, object] | None:
|
|
if self.executed[-1][1] == ("trace-1",):
|
|
return {"parent_id": "parent-1"}
|
|
if self.executed[-1][1] == ("parent-1",):
|
|
return {"parent_id": "root-1"}
|
|
if self.executed[-1][1] == ("root-1",):
|
|
return {"parent_id": None}
|
|
return None
|
|
|
|
def fetchall(self) -> list[dict[str, object]]:
|
|
if "WHERE parent_id = %s" in self._current_query:
|
|
parent_id = self.executed[-1][1][0]
|
|
if parent_id == "root-1":
|
|
return [{"trace_id": "parent-1"}]
|
|
if parent_id == "parent-1":
|
|
return [{"trace_id": "trace-1"}]
|
|
return []
|
|
trace_id = self.executed[-1][1][0]
|
|
return [
|
|
{
|
|
"id": 8,
|
|
"trace_id": trace_id,
|
|
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
|
"step": "parse",
|
|
"status": "failed",
|
|
"level": "ERROR",
|
|
"message": f"broken:{trace_id}",
|
|
"attrs_json": '{"attempt":1}',
|
|
}
|
|
]
|
|
|
|
def __enter__(self) -> FakeCursor:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnection:
|
|
def __init__(self, cursor: FakeCursor) -> None:
|
|
self._cursor = cursor
|
|
|
|
def cursor(self) -> FakeCursor:
|
|
return self._cursor
|
|
|
|
def __enter__(self) -> FakeConnection:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
return None
|
|
|
|
class FakeConnectionFactory:
|
|
def __init__(self) -> None:
|
|
self.cursor = FakeCursor()
|
|
|
|
def connect(self) -> FakeConnection:
|
|
return FakeConnection(self.cursor)
|
|
|
|
factory = FakeConnectionFactory()
|
|
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
|
|
|
view = reader.read_trace("trace-1", ("ERROR",), 2)
|
|
|
|
assert view is not None
|
|
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")
|