from __future__ import annotations import asyncio from dataclasses import dataclass, field from threading import Event, Lock, Thread from time import sleep from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus from app_runtime.core.registration import ModuleRegistry from app_runtime.core.runtime import RuntimeManager from app_runtime.queue.in_memory import InMemoryTaskQueue from app_runtime.tracing.transport import NoOpTraceTransport @dataclass class CollectingRoutine: processed: list[dict[str, object]] = field(default_factory=list) _done: bool = False def run(self) -> None: if self._done: return self.processed.append({"id": 1}) self._done = True class BlockingRoutine: def __init__(self, started: Event, release: Event) -> None: self._started = started self._release = release def run(self) -> None: self._started.set() self._release.wait(timeout=2.0) class StaticHealthContributor(HealthContributor): def health(self) -> WorkerHealth: return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"}) class RoutineWorker(Worker): def __init__( self, name: str, routine: object, *, interval: float = 0.01, concurrency: int = 1, critical: bool = True, ) -> None: self._name = name self._routine = routine self._interval = interval self._concurrency = concurrency self._critical = critical self._threads: list[Thread] = [] self._stop_requested = Event() self._force_stop = Event() self._lock = Lock() self._started = False self._in_flight = 0 self._runs = 0 self._failures = 0 self._last_error: str | None = None @property def name(self) -> str: return self._name @property def critical(self) -> bool: return self._critical def start(self) -> None: if any(thread.is_alive() for thread in self._threads): return self._threads.clear() self._stop_requested.clear() self._force_stop.clear() self._started = True for index in range(self._concurrency): thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True) self._threads.append(thread) thread.start() def stop(self, force: bool = False) -> None: self._stop_requested.set() if force: self._force_stop.set() def health(self) -> WorkerHealth: status = self.status() if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0: return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta) if self._failures > 0: return WorkerHealth(self.name, "degraded", self.critical, self._last_error, status.meta) return WorkerHealth(self.name, "ok", self.critical, meta=status.meta) def status(self) -> WorkerStatus: alive_threads = self._alive_threads() with self._lock: in_flight = self._in_flight runs = self._runs failures = self._failures detail = self._last_error if self._started and alive_threads == 0: state = "stopped" elif self._stop_requested.is_set(): state = "stopping" if alive_threads > 0 else "stopped" elif not self._started: state = "stopped" elif in_flight > 0: state = "busy" else: state = "idle" return WorkerStatus( name=self.name, state=state, in_flight=in_flight, detail=detail, meta={"alive_threads": alive_threads, "concurrency": self._concurrency, "runs": runs, "failures": failures}, ) def _run_loop(self) -> None: while True: if self._force_stop.is_set() or self._stop_requested.is_set(): return with self._lock: self._in_flight += 1 try: self._routine.run() except Exception as exc: with self._lock: self._failures += 1 self._last_error = str(exc) else: with self._lock: self._runs += 1 self._last_error = None finally: with self._lock: self._in_flight -= 1 if self._stop_requested.is_set(): return sleep(self._interval) def _alive_threads(self) -> int: return sum(1 for thread in self._threads if thread.is_alive()) class ExampleModule(ApplicationModule): def __init__(self) -> None: self.routine = CollectingRoutine() @property def name(self) -> str: return "example" def register(self, registry: ModuleRegistry) -> None: registry.add_worker(RoutineWorker("collector", self.routine)) registry.add_health_contributor(StaticHealthContributor()) class BlockingModule(ApplicationModule): def __init__(self, started: Event, release: Event) -> None: self.routine = BlockingRoutine(started, release) @property def name(self) -> str: return "blocking" def register(self, registry: ModuleRegistry) -> None: registry.add_worker(RoutineWorker("blocking-worker", self.routine)) class RecordingTransport(NoOpTraceTransport): def __init__(self) -> None: self.contexts: list[object] = [] self.messages: list[object] = [] def write_context(self, record) -> None: # type: ignore[override] self.contexts.append(record) def write_message(self, record) -> None: # type: ignore[override] self.messages.append(record) def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None: config_path = tmp_path / "config.yml" config_path.write_text( """ platform: workers: 1 log: version: 1 disable_existing_loggers: false handlers: console: class: logging.StreamHandler root: level: INFO handlers: [console] """.strip(), encoding="utf-8", ) runtime = RuntimeManager() runtime.add_config_file(config_path) module = ExampleModule() runtime.register_module(module) runtime.start() sleep(0.2) status = runtime.status() runtime.stop() assert module.routine.processed == [{"id": 1}] assert status["modules"] == ["example"] assert status["runtime"]["state"] == "idle" assert status["health"]["status"] == "ok" assert status["config"] == {"platform": {"workers": 1}, "log": status["config"]["log"]} def test_runtime_graceful_stop_waits_until_worker_finishes() -> None: started = Event() release = Event() runtime = RuntimeManager() runtime.register_module(BlockingModule(started, release)) runtime.start() assert started.wait(timeout=1.0) is True assert runtime.status()["runtime"]["state"] == "busy" stop_thread = Thread(target=runtime.stop, kwargs={"timeout": 1.0}, daemon=True) stop_thread.start() sleep(0.1) assert stop_thread.is_alive() is True release.set() stop_thread.join(timeout=1.0) assert stop_thread.is_alive() is False assert runtime.status()["runtime"]["state"] == "stopped" def test_trace_service_writes_contexts_and_messages() -> None: from app_runtime.tracing.service import TraceService transport = RecordingTransport() manager = TraceService(transport=transport) with manager.open_context(alias="worker", kind="worker", attrs={"routine": "incoming"}): manager.step("parse") manager.info("started", status="ok", attrs={"attempt": 1}) assert len(transport.contexts) == 1 assert len(transport.messages) == 1 assert transport.contexts[0].alias == "worker" assert transport.messages[0].step == "parse" def test_trace_service_supports_warning_and_error_levels() -> None: from app_runtime.tracing.service import TraceService transport = RecordingTransport() manager = TraceService(transport=transport) with manager.open_context(alias="worker", kind="worker", attrs={"routine": "incoming"}): manager.step("validate") manager.warning("validation warning", status="degraded", attrs={"attempt": 1}) manager.error("integration failed", status="failed", attrs={"integration": "crm"}) manager.exception("caught exception", attrs={"exception_type": "RuntimeError"}) levels = [message.level for message in transport.messages] assert levels == ["WARNING", "ERROR", "ERROR"] def test_trace_service_allows_messages_without_status() -> None: from app_runtime.tracing.service import TraceService transport = RecordingTransport() manager = TraceService(transport=transport) with manager.open_context(alias="worker", kind="worker"): manager.step("optional-status") manager.info("info without status") manager.warning("warning without status") manager.error("error without status") assert [message.status for message in transport.messages] == ["", "", ""] assert all(message.trace_id == transport.contexts[0].trace_id for message in transport.messages) def test_http_control_channel_exposes_health_and_actions() -> None: from app_runtime.control.base import ControlActionSet from app_runtime.control.http_channel import HttpControlChannel state = {"started": False} async def health(): return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} async def start_handler() -> str: state["started"] = True return "started" async def stop_handler() -> str: state["started"] = False return "stopped" async def status_handler() -> str: return "idle" if state["started"] else "stopped" async def scenario() -> None: channel = HttpControlChannel("127.0.0.1", 0, 2) await channel.start( ControlActionSet( health=health, start=start_handler, stop=stop_handler, status=status_handler, ) ) start_response = await channel._action_response("start") assert start_response.status_code == 200 assert start_response.body == b'{"status":"ok","detail":"started"}' health_payload = await channel._health_response() assert health_payload["status"] == "ok" status_response = await channel._action_response("status") assert status_response.status_code == 200 assert status_response.body == b'{"status":"ok","detail":"idle"}' await channel.stop() asyncio.run(scenario()) def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None: import plba from plba import ApplicationModule as PublicApplicationModule from plba import InMemoryTaskQueue from plba import Worker as PublicWorker from plba import WorkerHealth as PublicWorkerHealth from plba import WorkerStatus as PublicWorkerStatus from plba import create_runtime config_path = tmp_path / "config.yml" config_path.write_text("platform: {}\n", encoding="utf-8") queue = InMemoryTaskQueue[int]() queue.put(2) assert queue.get(timeout=0.01) == 2 queue.task_done() class PublicRoutine: def __init__(self) -> None: self.runs = 0 def run(self) -> None: if self.runs == 0: self.runs += 1 class PublicWorkerImpl(PublicWorker): def __init__(self, routine: PublicRoutine) -> None: self._inner = RoutineWorker("public-worker", routine) @property def name(self) -> str: return self._inner.name @property def critical(self) -> bool: return self._inner.critical def start(self) -> None: self._inner.start() def stop(self, force: bool = False) -> None: self._inner.stop(force=force) def health(self) -> PublicWorkerHealth: return self._inner.health() def status(self) -> PublicWorkerStatus: return self._inner.status() class PublicExampleModule(PublicApplicationModule): @property def name(self) -> str: return "public-example" def register(self, registry: ModuleRegistry) -> None: registry.add_worker(PublicWorkerImpl(PublicRoutine())) runtime = create_runtime(PublicExampleModule(), config_path=str(config_path)) runtime.start() sleep(0.2) assert runtime.configuration.get() == {"platform": {}} assert runtime.status()["workers"]["registered"] == 1 assert hasattr(plba, "QueueWorker") is False runtime.stop()