171 lines
5.9 KiB
Python
171 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from contextlib import contextmanager
|
|
from typing import Any, Iterator
|
|
from uuid import uuid4
|
|
|
|
from app_runtime.contracts.trace import (
|
|
TraceContext,
|
|
TraceContextFactory,
|
|
TraceContextRecord,
|
|
TraceLevel,
|
|
TraceLogMessage,
|
|
TraceTransport,
|
|
utc_now,
|
|
)
|
|
from app_runtime.tracing.store import TraceContextStore
|
|
from app_runtime.tracing.transport import NoOpTraceTransport
|
|
|
|
|
|
class TraceRecordWriter:
|
|
def __init__(self, transport: TraceTransport) -> None:
|
|
self._logger = logging.getLogger(__name__)
|
|
self._transport = transport
|
|
|
|
def write_context(self, record: TraceContextRecord) -> None:
|
|
try:
|
|
self._transport.write_context(record)
|
|
except Exception:
|
|
self._logger.exception("Trace transport failed to write context %s", record.trace_id)
|
|
|
|
def write_message(self, record: TraceLogMessage) -> None:
|
|
try:
|
|
self._transport.write_message(record)
|
|
except Exception:
|
|
self._logger.exception("Trace transport failed to write message for %s", record.trace_id)
|
|
|
|
|
|
class TraceService(TraceContextFactory):
|
|
def __init__(self, transport: TraceTransport | None = None, store: TraceContextStore | None = None) -> None:
|
|
self.transport = transport or NoOpTraceTransport()
|
|
self.store = store or TraceContextStore()
|
|
self._writer = TraceRecordWriter(self.transport)
|
|
|
|
def create_context(
|
|
self,
|
|
*,
|
|
alias: str,
|
|
parent_id: str | None = None,
|
|
kind: str | None = None,
|
|
attrs: dict[str, Any] | None = None,
|
|
) -> str:
|
|
record = TraceContextRecord(
|
|
trace_id=uuid4().hex,
|
|
alias=str(alias or ""),
|
|
parent_id=parent_id,
|
|
type=kind,
|
|
event_time=utc_now(),
|
|
attrs=dict(attrs or {}),
|
|
)
|
|
self.store.push(record)
|
|
self._writer.write_context(record)
|
|
return record.trace_id
|
|
|
|
@contextmanager
|
|
def open_context(
|
|
self,
|
|
*,
|
|
alias: str,
|
|
parent_id: str | None = None,
|
|
kind: str | None = None,
|
|
attrs: dict[str, Any] | None = None,
|
|
) -> Iterator[str]:
|
|
trace_id = self.create_context(alias=alias, parent_id=parent_id, kind=kind, attrs=attrs)
|
|
try:
|
|
yield trace_id
|
|
finally:
|
|
self.store.pop()
|
|
|
|
def current_trace_id(self) -> str | None:
|
|
return self.store.current_trace_id()
|
|
|
|
def close_context(self) -> str | None:
|
|
previous = self.store.pop()
|
|
return previous.record.trace_id if previous else None
|
|
|
|
def step(self, name: str) -> None:
|
|
self.store.set_step(str(name or ""))
|
|
|
|
def info(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None:
|
|
self._write_message("INFO", message, status, attrs)
|
|
|
|
def debug(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None:
|
|
self._write_message("DEBUG", message, status, attrs)
|
|
|
|
def warning(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None:
|
|
self._write_message("WARNING", message, status, attrs)
|
|
|
|
def error(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None:
|
|
self._write_message("ERROR", message, status, attrs)
|
|
|
|
def exception(self, message: str, *, status: str = "failed", attrs: dict[str, Any] | None = None) -> None:
|
|
self._write_message("ERROR", message, status, attrs)
|
|
|
|
def new_root(self, operation: str) -> TraceContext:
|
|
trace_id = self.create_context(alias=operation, kind="operation", attrs={"operation": operation})
|
|
return TraceContext(trace_id=trace_id, span_id=trace_id, attributes={"operation": operation})
|
|
|
|
def child_of(self, parent: TraceContext, operation: str) -> TraceContext:
|
|
trace_id = self.create_context(
|
|
alias=operation,
|
|
parent_id=parent.trace_id,
|
|
kind="worker",
|
|
attrs={"operation": operation},
|
|
)
|
|
return TraceContext(
|
|
trace_id=trace_id,
|
|
span_id=trace_id,
|
|
parent_span_id=parent.span_id,
|
|
attributes={"operation": operation},
|
|
)
|
|
|
|
def attach(self, metadata: dict[str, object], context: TraceContext) -> dict[str, object]:
|
|
updated = dict(metadata)
|
|
updated["trace_id"] = context.trace_id
|
|
updated["span_id"] = context.span_id
|
|
updated["parent_span_id"] = context.parent_span_id
|
|
return updated
|
|
|
|
def resume(self, metadata: dict[str, object], operation: str) -> TraceContext:
|
|
trace_id = str(metadata.get("trace_id") or uuid4().hex)
|
|
span_id = str(metadata.get("span_id") or trace_id)
|
|
parent_id = metadata.get("parent_span_id")
|
|
self.create_context(
|
|
alias=operation,
|
|
parent_id=str(parent_id) if parent_id else None,
|
|
kind="worker",
|
|
attrs=dict(metadata),
|
|
)
|
|
return TraceContext(
|
|
trace_id=trace_id,
|
|
span_id=span_id,
|
|
parent_span_id=str(parent_id) if parent_id else None,
|
|
attributes={"operation": operation},
|
|
)
|
|
|
|
def _write_message(
|
|
self,
|
|
level: TraceLevel,
|
|
message: str,
|
|
status: str | None,
|
|
attrs: dict[str, Any] | None,
|
|
) -> None:
|
|
active = self.store.current()
|
|
if active is None:
|
|
raise RuntimeError("Trace context is not bound. Call create_context() first.")
|
|
record = TraceLogMessage(
|
|
trace_id=active.record.trace_id,
|
|
step=active.step,
|
|
status=str(status or ""),
|
|
message=str(message or ""),
|
|
level=level,
|
|
event_time=utc_now(),
|
|
attrs=dict(attrs or {}),
|
|
)
|
|
self._writer.write_message(record)
|
|
|
|
|
|
class TraceManager(TraceService):
|
|
"""Compatibility alias during the transition from ConfigManager-shaped naming."""
|