This commit is contained in:
2026-03-04 10:01:49 +03:00
commit de787ce7ee
107 changed files with 2801 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
__all__: list[str] = []
Binary file not shown.
+4
View File
@@ -0,0 +1,4 @@
from app_runtime.config.file_loader import ConfigFileLoader
from app_runtime.config.providers import FileConfigProvider
__all__ = ["ConfigFileLoader", "FileConfigProvider"]
+48
View File
@@ -0,0 +1,48 @@
from __future__ import annotations
import hashlib
import json
from pathlib import Path
from typing import Any
import yaml
class ConfigFileLoader:
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: str | None = None
def read_text(self) -> str:
return self.path.read_text(encoding="utf-8")
def parse(self, data: str) -> Any:
suffix = self.path.suffix.lower()
if suffix in {".yml", ".yaml"}:
return yaml.safe_load(data)
return json.loads(data)
def load_sync(self) -> Any:
data = self.read_text()
parsed = self.parse(data)
self.config = parsed
self.last_valid_config = parsed
self._last_seen_hash = self._calculate_hash(data)
return parsed
def load_if_changed(self) -> tuple[bool, Any]:
data = self.read_text()
current_hash = self._calculate_hash(data)
if current_hash == self._last_seen_hash:
return False, self.config
parsed = self.parse(data)
self.config = parsed
self.last_valid_config = parsed
self._last_seen_hash = current_hash
return True, parsed
@staticmethod
def _calculate_hash(data: str) -> str:
return hashlib.sha256(data.encode("utf-8")).hexdigest()
+22
View File
@@ -0,0 +1,22 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from app_runtime.config.file_loader import ConfigFileLoader
from app_runtime.contracts.config import ConfigProvider
class FileConfigProvider(ConfigProvider):
def __init__(self, path: str | Path) -> None:
self._loader = ConfigFileLoader(path)
@property
def loader(self) -> ConfigFileLoader:
return self._loader
def load(self) -> dict[str, Any]:
config = self._loader.load_sync()
if not isinstance(config, dict):
raise TypeError("Config root must be a mapping")
return dict(config)
+1
View File
@@ -0,0 +1 @@
__all__: list[str] = []
+16
View File
@@ -0,0 +1,16 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from app_runtime.core.registration import ModuleRegistry
class ApplicationModule(ABC):
@property
@abstractmethod
def name(self) -> str:
"""Module name used for runtime status and diagnostics."""
@abstractmethod
def register(self, registry: ModuleRegistry) -> None:
"""Register workers, queues, handlers, services, and health contributors."""
+10
View File
@@ -0,0 +1,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
class ConfigProvider(ABC):
@abstractmethod
def load(self) -> dict[str, Any]:
"""Return a config fragment."""
+10
View File
@@ -0,0 +1,10 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from app_runtime.contracts.worker import WorkerHealth
class HealthContributor(ABC):
@abstractmethod
def health(self) -> WorkerHealth:
"""Return contributor health state."""
+28
View File
@@ -0,0 +1,28 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
from app_runtime.contracts.tasks import Task
class TaskQueue(ABC):
@abstractmethod
def publish(self, task: Task) -> None:
"""Push a task into the queue."""
@abstractmethod
def consume(self, timeout: float = 0.1) -> Task | None:
"""Return the next available task or None."""
@abstractmethod
def ack(self, task: Task) -> None:
"""Confirm successful task processing."""
@abstractmethod
def nack(self, task: Task, retry_delay: float | None = None) -> None:
"""Signal failed task processing."""
@abstractmethod
def stats(self) -> dict[str, Any]:
"""Return transport-level queue statistics."""
+18
View File
@@ -0,0 +1,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class Task:
name: str
payload: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)
class TaskHandler(ABC):
@abstractmethod
def handle(self, task: Task) -> None:
"""Execute domain logic for a task."""
+57
View File
@@ -0,0 +1,57 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Protocol
def utc_now() -> datetime:
return datetime.now(timezone.utc)
@dataclass(slots=True)
class TraceContext:
trace_id: str
span_id: str
parent_span_id: str | None = None
attributes: dict[str, Any] | None = None
class TraceContextFactory(ABC):
@abstractmethod
def new_root(self, operation: str) -> TraceContext:
"""Create a new root trace context."""
@abstractmethod
def child_of(self, parent: TraceContext, operation: str) -> TraceContext:
"""Create a child trace context."""
@dataclass(frozen=True)
class TraceContextRecord:
trace_id: str
alias: str
parent_id: str | None = None
type: str | None = None
event_time: datetime = field(default_factory=utc_now)
attrs: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class TraceLogMessage:
trace_id: str
step: str
status: str
message: str
level: str
event_time: datetime = field(default_factory=utc_now)
attrs: dict[str, Any] = field(default_factory=dict)
class TraceTransport(Protocol):
def write_context(self, record: TraceContextRecord) -> None:
"""Persist trace context record."""
def write_message(self, record: TraceLogMessage) -> None:
"""Persist trace log message."""
+55
View File
@@ -0,0 +1,55 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Literal
HealthState = Literal["ok", "degraded", "unhealthy"]
WorkerState = Literal["starting", "idle", "busy", "stopping", "stopped"]
@dataclass(slots=True)
class WorkerHealth:
name: str
status: HealthState
critical: bool
detail: str | None = None
meta: dict[str, Any] = field(default_factory=dict)
@dataclass(slots=True)
class WorkerStatus:
name: str
state: WorkerState
in_flight: int = 0
detail: str | None = None
meta: dict[str, Any] = field(default_factory=dict)
class Worker(ABC):
@property
@abstractmethod
def name(self) -> str:
"""Stable worker name for diagnostics."""
@property
@abstractmethod
def critical(self) -> bool:
"""Whether this worker is required for healthy app operation."""
@abstractmethod
def start(self) -> None:
"""Start worker execution."""
@abstractmethod
def stop(self, force: bool = False) -> None:
"""Request graceful or immediate stop."""
@abstractmethod
def health(self) -> WorkerHealth:
"""Return current health state."""
@abstractmethod
def status(self) -> WorkerStatus:
"""Return current runtime status."""
+5
View File
@@ -0,0 +1,5 @@
from app_runtime.control.base import ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService
__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"]
+28
View File
@@ -0,0 +1,28 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from app_runtime.core.types import HealthPayload
ActionHandler = Callable[[], Awaitable[str]]
HealthHandler = Callable[[], Awaitable[HealthPayload]]
@dataclass(slots=True)
class ControlActionSet:
health: HealthHandler
start: ActionHandler
stop: ActionHandler
status: ActionHandler
class ControlChannel(ABC):
@abstractmethod
async def start(self, actions: ControlActionSet) -> None:
"""Start the control channel and bind handlers."""
@abstractmethod
async def stop(self) -> None:
"""Stop the control channel and release resources."""
+38
View File
@@ -0,0 +1,38 @@
from __future__ import annotations
import time
from collections.abc import Awaitable, Callable
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app_runtime.core.types import HealthPayload
class HttpControlAppFactory:
def create(
self,
health_provider: Callable[[], Awaitable[HealthPayload]],
action_provider: Callable[[str], Awaitable[JSONResponse]],
) -> FastAPI:
app = FastAPI(title="PLBA Control API")
@app.middleware("http")
async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
response.headers["X-Response-Time-Ms"] = str(int((time.monotonic() - started) * 1000))
return response
@app.get("/health")
async def health() -> JSONResponse:
payload = await health_provider()
status_code = 200 if payload.get("status") == "ok" else 503
return JSONResponse(content=payload, status_code=status_code)
@app.get("/actions/{action}")
@app.post("/actions/{action}")
async def action(action: str) -> JSONResponse:
return await action_provider(action)
return app
+53
View File
@@ -0,0 +1,53 @@
from __future__ import annotations
import asyncio
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionSet, ControlChannel
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.control.http_runner import UvicornThreadRunner
class HttpControlChannel(ControlChannel):
def __init__(self, host: str, port: int, timeout: int) -> None:
self._timeout = timeout
self._runner = UvicornThreadRunner(host, port, timeout)
self._factory = HttpControlAppFactory()
self._actions: ControlActionSet | None = None
async def start(self, actions: ControlActionSet) -> None:
self._actions = actions
app = self._factory.create(self._health_response, self._action_response)
await self._runner.start(app)
async def stop(self) -> None:
await self._runner.stop()
@property
def port(self) -> int:
return self._runner.port
async def _health_response(self) -> dict[str, object]:
if self._actions is None:
return {"status": "unhealthy", "detail": "control actions are not configured"}
return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout))
async def _action_response(self, action: str) -> JSONResponse:
if self._actions is None:
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
callbacks = {
"start": self._actions.start,
"stop": self._actions.stop,
"status": self._actions.status,
}
callback = callbacks.get(action)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
try:
detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
except asyncio.TimeoutError:
return JSONResponse(content={"status": "error", "detail": f"{action} handler timeout"}, status_code=504)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
+61
View File
@@ -0,0 +1,61 @@
from __future__ import annotations
import asyncio
from threading import Thread
from fastapi import FastAPI
from uvicorn import Config, Server
class UvicornThreadRunner:
def __init__(self, host: str, port: int, timeout: int) -> None:
self._host = host
self._port = port
self._timeout = timeout
self._server: Server | None = None
self._thread: Thread | None = None
self._error: BaseException | None = None
async def start(self, app: FastAPI) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._error = None
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config)
self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True)
self._thread.start()
await self._wait_until_started()
async def stop(self) -> None:
if self._server is None or self._thread is None:
return
self._server.should_exit = True
await asyncio.to_thread(self._thread.join, self._timeout)
self._server = None
self._thread = None
@property
def port(self) -> int:
if self._server is None or not getattr(self._server, "servers", None):
return self._port
socket = self._server.servers[0].sockets[0]
return int(socket.getsockname()[1])
async def _wait_until_started(self) -> None:
if self._server is None:
raise RuntimeError("Server is not initialized")
deadline = asyncio.get_running_loop().time() + max(float(self._timeout), 1.0)
while not self._server.started:
if self._error is not None:
raise RuntimeError("HTTP control server failed to start") from self._error
if asyncio.get_running_loop().time() >= deadline:
raise TimeoutError("HTTP control server startup timed out")
await asyncio.sleep(0.05)
def _serve(self) -> None:
if self._server is None:
return
try:
asyncio.run(self._server.serve())
except BaseException as exc: # noqa: BLE001
self._error = exc
+52
View File
@@ -0,0 +1,52 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.control.base import ControlActionSet, ControlChannel
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class ControlPlaneService:
def __init__(self) -> None:
self._channels: list[ControlChannel] = []
def register_channel(self, channel: ControlChannel) -> None:
self._channels.append(channel)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
def snapshot(self, runtime: RuntimeManager) -> dict[str, object]:
health = runtime.current_health()
return {
"runtime": {"state": runtime._state.value},
"modules": list(runtime.registry.modules),
"services": runtime.services.snapshot(),
"workers": runtime.workers.snapshot(),
"health": runtime.health.snapshot(runtime.workers.healths()) | {"status": health["status"]},
"config": runtime.configuration.get(),
}
async def _start_async(self, runtime: RuntimeManager) -> None:
actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
for channel in self._channels:
await channel.start(actions)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+1
View File
@@ -0,0 +1 @@
__all__: list[str] = []
+48
View File
@@ -0,0 +1,48 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from app_runtime.contracts.config import ConfigProvider
class ConfigurationManager:
def __init__(self) -> None:
self._providers: list[ConfigProvider] = []
self._subscribers: list[Callable[[dict[str, Any]], None]] = []
self._config: dict[str, Any] = {}
def add_provider(self, provider: ConfigProvider) -> None:
self._providers.append(provider)
def subscribe(self, callback: Callable[[dict[str, Any]], None]) -> None:
self._subscribers.append(callback)
def load(self) -> dict[str, Any]:
merged: dict[str, Any] = {}
for provider in self._providers:
merged = self._deep_merge(merged, provider.load())
self._config = merged
return dict(self._config)
def reload(self) -> dict[str, Any]:
config = self.load()
for callback in self._subscribers:
callback(dict(config))
return config
def get(self) -> dict[str, Any]:
return dict(self._config)
def section(self, name: str, default: Any = None) -> Any:
return self._config.get(name, default)
def _deep_merge(self, left: dict[str, Any], right: dict[str, Any]) -> dict[str, Any]:
merged = dict(left)
for key, value in right.items():
current = merged.get(key)
if isinstance(current, dict) and isinstance(value, dict):
merged[key] = self._deep_merge(current, value)
continue
merged[key] = value
return merged
+32
View File
@@ -0,0 +1,32 @@
from __future__ import annotations
from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.queue import TaskQueue
from app_runtime.contracts.tasks import TaskHandler
from app_runtime.contracts.worker import Worker
from app_runtime.core.service_container import ServiceContainer
class ModuleRegistry:
def __init__(self, services: ServiceContainer) -> None:
self.services = services
self.queues: dict[str, TaskQueue] = {}
self.handlers: dict[str, TaskHandler] = {}
self.workers: list[Worker] = []
self.health_contributors: list[HealthContributor] = []
self.modules: list[str] = []
def register_module(self, name: str) -> None:
self.modules.append(name)
def add_queue(self, name: str, queue: TaskQueue) -> None:
self.queues[name] = queue
def add_handler(self, name: str, handler: TaskHandler) -> None:
self.handlers[name] = handler
def add_worker(self, worker: Worker) -> None:
self.workers.append(worker)
def add_health_contributor(self, contributor: HealthContributor) -> None:
self.health_contributors.append(contributor)
+125
View File
@@ -0,0 +1,125 @@
from __future__ import annotations
from app_runtime.config.providers import FileConfigProvider
from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.service import ControlPlaneService
from app_runtime.core.configuration import ConfigurationManager
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.service_container import ServiceContainer
from app_runtime.core.types import HealthPayload, LifecycleState
from app_runtime.health.registry import HealthRegistry
from app_runtime.logging.manager import LogManager
from app_runtime.tracing.service import TraceService
from app_runtime.workers.supervisor import WorkerSupervisor
class RuntimeManager:
def __init__(
self,
configuration: ConfigurationManager | None = None,
services: ServiceContainer | None = None,
traces: TraceService | None = None,
health: HealthRegistry | None = None,
logs: LogManager | None = None,
workers: WorkerSupervisor | None = None,
control_plane: ControlPlaneService | None = None,
) -> None:
self.configuration = configuration or ConfigurationManager()
self.services = services or ServiceContainer()
self.traces = traces or TraceService()
self.health = health or HealthRegistry()
self.logs = logs or LogManager()
self.workers = workers or WorkerSupervisor()
self.control_plane = control_plane or ControlPlaneService()
self.registry = ModuleRegistry(self.services)
self._started = False
self._state = LifecycleState.IDLE
self._core_registered = False
self._workers_registered = False
self._register_core_services()
def register_module(self, module: ApplicationModule) -> None:
self.registry.register_module(module.name)
module.register(self.registry)
def add_config_file(self, path: str) -> FileConfigProvider:
provider = FileConfigProvider(path)
self.configuration.add_provider(provider)
return provider
def start(self) -> None:
if self._started:
return
self._state = LifecycleState.STARTING
config = self.configuration.load()
self.logs.apply_config(config)
self._register_health_contributors()
self._register_workers()
self.workers.start()
self.control_plane.start(self)
self._started = True
self._refresh_state()
def stop(self, timeout: float = 30.0, force: bool = False, stop_control_plane: bool = True) -> None:
if not self._started:
return
self._state = LifecycleState.STOPPING
self.workers.stop(timeout=timeout, force=force)
if stop_control_plane:
self.control_plane.stop()
self._started = False
self._state = LifecycleState.STOPPED
def status(self) -> dict[str, object]:
self._refresh_state()
return self.control_plane.snapshot(self)
def current_health(self) -> HealthPayload:
self._refresh_state()
return self.health.payload(self._state, self.workers.healths())
async def health_status(self) -> HealthPayload:
return self.current_health()
async def start_runtime(self) -> str:
if self._started:
return "runtime already running"
self.start()
return "runtime started"
async def stop_runtime(self) -> str:
if not self._started:
return "runtime already stopped"
self.stop(stop_control_plane=False)
return "runtime stopped"
async def runtime_status(self) -> str:
self._refresh_state()
return self._state.value
def _register_core_services(self) -> None:
if self._core_registered:
return
self.services.register("configuration", self.configuration)
self.services.register("traces", self.traces)
self.services.register("health", self.health)
self.services.register("logs", self.logs)
self.services.register("workers", self.workers)
self.services.register("control_plane", self.control_plane)
self._core_registered = True
def _register_health_contributors(self) -> None:
for contributor in self.registry.health_contributors:
self.health.register(contributor)
def _register_workers(self) -> None:
if self._workers_registered:
return
for worker in self.registry.workers:
self.workers.register(worker)
self._workers_registered = True
def _refresh_state(self) -> None:
if not self._started or self._state == LifecycleState.STOPPING:
return
self._state = self.workers.lifecycle_state()
+28
View File
@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Any
class ServiceContainer:
def __init__(self) -> None:
self._services: dict[str, Any] = {}
def register(self, name: str, service: Any) -> None:
if name in self._services:
raise ValueError(f"Service '{name}' is already registered")
self._services[name] = service
def get(self, name: str) -> Any:
try:
return self._services[name]
except KeyError as exc:
raise KeyError(f"Service '{name}' is not registered") from exc
def require(self, name: str, expected_type: type[Any]) -> Any:
service = self.get(name)
if not isinstance(service, expected_type):
raise TypeError(f"Service '{name}' is not of type {expected_type.__name__}")
return service
def snapshot(self) -> dict[str, str]:
return {name: type(service).__name__ for name, service in self._services.items()}
+19
View File
@@ -0,0 +1,19 @@
from __future__ import annotations
from enum import Enum
from typing import TypedDict
class HealthPayload(TypedDict, total=False):
status: str
detail: str
state: str
components: list[dict[str, object]]
class LifecycleState(str, Enum):
STARTING = "starting"
IDLE = "idle"
BUSY = "busy"
STOPPING = "stopping"
STOPPED = "stopped"
+3
View File
@@ -0,0 +1,3 @@
from app_runtime.health.registry import HealthRegistry
__all__ = ["HealthRegistry"]
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import WorkerHealth
from app_runtime.core.types import HealthPayload, LifecycleState
class HealthRegistry:
def __init__(self) -> None:
self._contributors: list[HealthContributor] = []
def register(self, contributor: HealthContributor) -> None:
self._contributors.append(contributor)
def contributor_healths(self) -> list[WorkerHealth]:
return [contributor.health() for contributor in self._contributors]
def snapshot(self, worker_healths: list[WorkerHealth]) -> dict[str, object]:
component_healths = worker_healths + self.contributor_healths()
return {
"status": self._aggregate_status(component_healths),
"components": [self._health_dict(item) for item in component_healths],
}
def payload(self, state: LifecycleState, worker_healths: list[WorkerHealth]) -> HealthPayload:
component_healths = worker_healths + self.contributor_healths()
if state == LifecycleState.STOPPED:
return {"status": "unhealthy", "detail": "state=stopped", "state": state.value}
if state in {LifecycleState.STARTING, LifecycleState.STOPPING}:
return {
"status": "degraded",
"detail": f"state={state.value}",
"state": state.value,
"components": [self._health_dict(item) for item in component_healths],
}
return {
"status": self._aggregate_status(component_healths),
"state": state.value,
"components": [self._health_dict(item) for item in component_healths],
}
def _aggregate_status(self, component_healths: list[WorkerHealth]) -> str:
if any(item.status == "unhealthy" and item.critical for item in component_healths):
return "unhealthy"
if any(item.status in {"degraded", "unhealthy"} for item in component_healths):
return "degraded"
return "ok"
def _health_dict(self, health: WorkerHealth) -> dict[str, object]:
return {
"name": health.name,
"status": health.status,
"critical": health.critical,
"detail": health.detail,
"meta": health.meta,
}
+3
View File
@@ -0,0 +1,3 @@
from app_runtime.logging.manager import LogManager
__all__ = ["LogManager"]
+31
View File
@@ -0,0 +1,31 @@
from __future__ import annotations
import logging
import logging.config
class LogManager:
def __init__(self) -> None:
self._logger = logging.getLogger(__name__)
self._last_valid_config: dict | None = None
def apply_config(self, config: dict) -> None:
logging_config = config.get("log")
if not logging_config:
self._logger.warning("Config has no 'log' section; default logging remains active.")
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = dict(logging_config)
self._logger.info("Logging configuration applied")
except Exception:
self._logger.exception("Failed to apply logging configuration")
self._restore_last_valid()
def _restore_last_valid(self) -> None:
if self._last_valid_config is None:
return
try:
logging.config.dictConfig(self._last_valid_config)
except Exception:
self._logger.exception("Failed to restore previous logging configuration")
+3
View File
@@ -0,0 +1,3 @@
from app_runtime.queue.in_memory import InMemoryTaskQueue
__all__ = ["InMemoryTaskQueue"]
+43
View File
@@ -0,0 +1,43 @@
from __future__ import annotations
from queue import Empty, Queue
from app_runtime.contracts.queue import TaskQueue
from app_runtime.contracts.tasks import Task
class InMemoryTaskQueue(TaskQueue):
def __init__(self) -> None:
self._queue: Queue[Task] = Queue()
self._published = 0
self._acked = 0
self._nacked = 0
def publish(self, task: Task) -> None:
self._published += 1
self._queue.put(task)
def consume(self, timeout: float = 0.1) -> Task | None:
try:
return self._queue.get(timeout=timeout)
except Empty:
return None
def ack(self, task: Task) -> None:
del task
self._acked += 1
self._queue.task_done()
def nack(self, task: Task, retry_delay: float | None = None) -> None:
del retry_delay
self._nacked += 1
self._queue.put(task)
self._queue.task_done()
def stats(self) -> dict[str, int]:
return {
"published": self._published,
"acked": self._acked,
"nacked": self._nacked,
"queued": self._queue.qsize(),
}
+16
View File
@@ -0,0 +1,16 @@
from app_runtime.contracts.trace import TraceContext, TraceContextRecord, TraceLogMessage, TraceTransport
from app_runtime.tracing.service import TraceManager, TraceService
from app_runtime.tracing.store import ActiveTraceContext, TraceContextStore
from app_runtime.tracing.transport import NoOpTraceTransport
__all__ = [
"ActiveTraceContext",
"NoOpTraceTransport",
"TraceContext",
"TraceContextRecord",
"TraceContextStore",
"TraceLogMessage",
"TraceManager",
"TraceService",
"TraceTransport",
]
+166
View File
@@ -0,0 +1,166 @@
from __future__ import annotations
import logging
from contextlib import contextmanager
from typing import Any, Iterator
from uuid import uuid4
from app_runtime.contracts.trace import (
TraceContext,
TraceContextFactory,
TraceContextRecord,
TraceLogMessage,
TraceTransport,
utc_now,
)
from app_runtime.tracing.store import TraceContextStore
from app_runtime.tracing.transport import NoOpTraceTransport
class TraceRecordWriter:
def __init__(self, transport: TraceTransport) -> None:
self._logger = logging.getLogger(__name__)
self._transport = transport
def write_context(self, record: TraceContextRecord) -> None:
try:
self._transport.write_context(record)
except Exception:
self._logger.exception("Trace transport failed to write context %s", record.trace_id)
def write_message(self, record: TraceLogMessage) -> None:
try:
self._transport.write_message(record)
except Exception:
self._logger.exception("Trace transport failed to write message for %s", record.trace_id)
class TraceService(TraceContextFactory):
def __init__(self, transport: TraceTransport | None = None, store: TraceContextStore | None = None) -> None:
self.transport = transport or NoOpTraceTransport()
self.store = store or TraceContextStore()
self._writer = TraceRecordWriter(self.transport)
def create_context(
self,
*,
alias: str,
parent_id: str | None = None,
kind: str | None = None,
attrs: dict[str, Any] | None = None,
) -> str:
record = TraceContextRecord(
trace_id=uuid4().hex,
alias=str(alias or ""),
parent_id=parent_id,
type=kind,
event_time=utc_now(),
attrs=dict(attrs or {}),
)
self.store.push(record)
self._writer.write_context(record)
return record.trace_id
@contextmanager
def open_context(
self,
*,
alias: str,
parent_id: str | None = None,
kind: str | None = None,
attrs: dict[str, Any] | None = None,
) -> Iterator[str]:
trace_id = self.create_context(alias=alias, parent_id=parent_id, kind=kind, attrs=attrs)
try:
yield trace_id
finally:
self.store.pop()
def current_trace_id(self) -> str | None:
return self.store.current_trace_id()
def close_context(self) -> str | None:
previous = self.store.pop()
return previous.record.trace_id if previous else None
def step(self, name: str) -> None:
self.store.set_step(str(name or ""))
def info(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None:
self._write_message("INFO", message, status, attrs)
def warning(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None:
self._write_message("WARNING", message, status, attrs)
def error(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None:
self._write_message("ERROR", message, status, attrs)
def exception(self, message: str, *, status: str = "failed", attrs: dict[str, Any] | None = None) -> None:
self._write_message("ERROR", message, status, attrs)
def new_root(self, operation: str) -> TraceContext:
trace_id = self.create_context(alias=operation, kind="source", attrs={"operation": operation})
return TraceContext(trace_id=trace_id, span_id=trace_id, attributes={"operation": operation})
def child_of(self, parent: TraceContext, operation: str) -> TraceContext:
trace_id = self.create_context(
alias=operation,
parent_id=parent.trace_id,
kind="worker",
attrs={"operation": operation},
)
return TraceContext(
trace_id=trace_id,
span_id=trace_id,
parent_span_id=parent.span_id,
attributes={"operation": operation},
)
def attach(self, task_metadata: dict[str, object], context: TraceContext) -> dict[str, object]:
updated = dict(task_metadata)
updated["trace_id"] = context.trace_id
updated["span_id"] = context.span_id
updated["parent_span_id"] = context.parent_span_id
return updated
def resume(self, task_metadata: dict[str, object], operation: str) -> TraceContext:
trace_id = str(task_metadata.get("trace_id") or uuid4().hex)
span_id = str(task_metadata.get("span_id") or trace_id)
parent_id = task_metadata.get("parent_span_id")
self.create_context(
alias=operation,
parent_id=str(parent_id) if parent_id else None,
kind="handler",
attrs=dict(task_metadata),
)
return TraceContext(
trace_id=trace_id,
span_id=span_id,
parent_span_id=str(parent_id) if parent_id else None,
attributes={"operation": operation},
)
def _write_message(
self,
level: str,
message: str,
status: str,
attrs: dict[str, Any] | None,
) -> None:
active = self.store.current()
if active is None:
raise RuntimeError("Trace context is not bound. Call create_context() first.")
record = TraceLogMessage(
trace_id=active.record.trace_id,
step=active.step,
status=str(status or ""),
message=str(message or ""),
level=level,
event_time=utc_now(),
attrs=dict(attrs or {}),
)
self._writer.write_message(record)
class TraceManager(TraceService):
"""Compatibility alias during the transition from ConfigManager-shaped naming."""
+52
View File
@@ -0,0 +1,52 @@
from __future__ import annotations
from contextvars import ContextVar
from dataclasses import dataclass, replace
from app_runtime.contracts.trace import TraceContextRecord
@dataclass(frozen=True)
class ActiveTraceContext:
record: TraceContextRecord
step: str = ""
class TraceContextStore:
def __init__(self) -> None:
self._current: ContextVar[ActiveTraceContext | None] = ContextVar("trace_current", default=None)
self._stack: ContextVar[tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=())
def current(self) -> ActiveTraceContext | None:
return self._current.get()
def current_trace_id(self) -> str | None:
active = self.current()
return active.record.trace_id if active else None
def push(self, record: TraceContextRecord) -> ActiveTraceContext:
active = self.current()
stack = self._stack.get()
if active is not None:
self._stack.set(stack + (active,))
updated = ActiveTraceContext(record=record)
self._current.set(updated)
return updated
def pop(self) -> ActiveTraceContext | None:
stack = self._stack.get()
if not stack:
self._current.set(None)
return None
previous = stack[-1]
self._stack.set(stack[:-1])
self._current.set(previous)
return previous
def set_step(self, step: str) -> ActiveTraceContext | None:
active = self.current()
if active is None:
return None
updated = replace(active, step=step)
self._current.set(updated)
return updated
+11
View File
@@ -0,0 +1,11 @@
from __future__ import annotations
from app_runtime.contracts.trace import TraceContextRecord, TraceLogMessage, TraceTransport
class NoOpTraceTransport(TraceTransport):
def write_context(self, record: TraceContextRecord) -> None:
del record
def write_message(self, record: TraceLogMessage) -> None:
del record
+4
View File
@@ -0,0 +1,4 @@
from app_runtime.workers.queue_worker import QueueWorker
from app_runtime.workers.supervisor import WorkerSupervisor
__all__ = ["QueueWorker", "WorkerSupervisor"]
+125
View File
@@ -0,0 +1,125 @@
from __future__ import annotations
from threading import Event, Lock, Thread
from app_runtime.contracts.queue import TaskQueue
from app_runtime.contracts.tasks import TaskHandler
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
from app_runtime.tracing.service import TraceService
class QueueWorker(Worker):
def __init__(
self,
name: str,
queue: TaskQueue,
handler: TaskHandler,
traces: TraceService,
*,
concurrency: int = 1,
critical: bool = True,
) -> None:
self._name = name
self._queue = queue
self._handler = handler
self._traces = traces
self._concurrency = concurrency
self._critical = critical
self._threads: list[Thread] = []
self._stop_requested = Event()
self._force_stop = Event()
self._lock = Lock()
self._started = False
self._in_flight = 0
self._processed = 0
self._failures = 0
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return self._critical
def start(self) -> None:
if any(thread.is_alive() for thread in self._threads):
return
self._threads.clear()
self._stop_requested.clear()
self._force_stop.clear()
self._started = True
for index in range(self._concurrency):
thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True)
self._threads.append(thread)
thread.start()
def stop(self, force: bool = False) -> None:
self._stop_requested.set()
if force:
self._force_stop.set()
def health(self) -> WorkerHealth:
status = self.status()
if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0:
return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta)
if self._failures > 0:
return WorkerHealth(self.name, "degraded", self.critical, "worker has processing failures", status.meta)
return WorkerHealth(self.name, "ok", self.critical, meta=status.meta)
def status(self) -> WorkerStatus:
alive_threads = self._alive_threads()
with self._lock:
in_flight = self._in_flight
processed = self._processed
failures = self._failures
if self._started and alive_threads == 0:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping" if alive_threads > 0 else "stopped"
elif not self._started:
state = "stopped"
elif in_flight > 0:
state = "busy"
else:
state = "idle"
return WorkerStatus(
name=self.name,
state=state,
in_flight=in_flight,
meta={
"alive_threads": alive_threads,
"concurrency": self._concurrency,
"processed": processed,
"failures": failures,
},
)
def _run_loop(self) -> None:
while True:
if self._force_stop.is_set() or self._stop_requested.is_set():
return
task = self._queue.consume(timeout=0.1)
if task is None:
continue
with self._lock:
self._in_flight += 1
self._traces.resume(task.metadata, f"worker:{self.name}")
try:
self._handler.handle(task)
except Exception:
with self._lock:
self._failures += 1
self._queue.nack(task)
else:
with self._lock:
self._processed += 1
self._queue.ack(task)
finally:
with self._lock:
self._in_flight -= 1
if self._stop_requested.is_set():
return
def _alive_threads(self) -> int:
return sum(1 for thread in self._threads if thread.is_alive())
+59
View File
@@ -0,0 +1,59 @@
from __future__ import annotations
from dataclasses import asdict
from time import monotonic, sleep
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
from app_runtime.core.types import LifecycleState
class WorkerSupervisor:
def __init__(self) -> None:
self._workers: list[Worker] = []
def register(self, worker: Worker) -> None:
self._workers.append(worker)
def start(self) -> None:
for worker in self._workers:
worker.start()
def stop(self, timeout: float = 30.0, force: bool = False) -> None:
for worker in self._workers:
worker.stop(force=force)
if force:
return
deadline = monotonic() + timeout
while True:
if all(status.state == "stopped" for status in self.statuses()):
return
if monotonic() >= deadline:
raise TimeoutError("Workers did not stop within the requested timeout")
sleep(0.05)
def snapshot(self) -> dict[str, object]:
return {
"registered": len(self._workers),
"workers": [asdict(status) for status in self.statuses()],
}
def healths(self) -> list[WorkerHealth]:
return [worker.health() for worker in self._workers]
def statuses(self) -> list[WorkerStatus]:
return [worker.status() for worker in self._workers]
def lifecycle_state(self) -> LifecycleState:
statuses = self.statuses()
if not statuses:
return LifecycleState.IDLE
states = {status.state for status in statuses}
if "stopping" in states:
return LifecycleState.STOPPING
if "starting" in states:
return LifecycleState.STARTING
if "busy" in states:
return LifecycleState.BUSY
if states == {"stopped"}:
return LifecycleState.STOPPED
return LifecycleState.IDLE