from __future__ import annotations import asyncio from dataclasses import dataclass, field from threading import Event, Thread from time import sleep from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.tasks import Task, TaskHandler from app_runtime.contracts.worker import WorkerHealth from app_runtime.core.registration import ModuleRegistry from app_runtime.core.runtime import RuntimeManager from app_runtime.queue.in_memory import InMemoryTaskQueue from app_runtime.tracing.transport import NoOpTraceTransport from app_runtime.workers.queue_worker import QueueWorker @dataclass class CollectingHandler(TaskHandler): processed: list[dict[str, object]] = field(default_factory=list) def handle(self, task: Task) -> None: self.processed.append(task.payload) class BlockingHandler(TaskHandler): def __init__(self, started: Event, release: Event) -> None: self._started = started self._release = release def handle(self, task: Task) -> None: del task self._started.set() self._release.wait(timeout=2.0) class StaticHealthContributor(HealthContributor): def health(self) -> WorkerHealth: return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"}) class ExampleModule(ApplicationModule): def __init__(self) -> None: self.handler = CollectingHandler() self.queue = InMemoryTaskQueue() @property def name(self) -> str: return "example" def register(self, registry: ModuleRegistry) -> None: traces = registry.services.get("traces") registry.add_queue("incoming", self.queue) registry.add_handler("collect", self.handler) self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) registry.add_worker(QueueWorker("collector", self.queue, self.handler, traces, concurrency=1)) registry.add_health_contributor(StaticHealthContributor()) class BlockingModule(ApplicationModule): def __init__(self, started: Event, release: Event) -> None: self.queue = InMemoryTaskQueue() self.handler = BlockingHandler(started, release) @property def name(self) -> str: return "blocking" def register(self, registry: ModuleRegistry) -> None: traces = registry.services.get("traces") self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) registry.add_worker(QueueWorker("blocking-worker", self.queue, self.handler, traces, concurrency=1)) class RecordingTransport(NoOpTraceTransport): def __init__(self) -> None: self.contexts: list[object] = [] self.messages: list[object] = [] def write_context(self, record) -> None: # type: ignore[override] self.contexts.append(record) def write_message(self, record) -> None: # type: ignore[override] self.messages.append(record) def test_runtime_processes_tasks_and_exposes_status(tmp_path) -> None: config_path = tmp_path / "config.yml" config_path.write_text( """ platform: workers: 1 log: version: 1 disable_existing_loggers: false handlers: console: class: logging.StreamHandler root: level: INFO handlers: [console] """.strip(), encoding="utf-8", ) runtime = RuntimeManager() runtime.add_config_file(config_path) module = ExampleModule() runtime.register_module(module) runtime.start() sleep(0.2) status = runtime.status() runtime.stop() assert module.handler.processed == [{"id": 1}] assert status["modules"] == ["example"] assert status["runtime"]["state"] == "idle" assert status["health"]["status"] == "ok" assert status["config"] == {"platform": {"workers": 1}, "log": status["config"]["log"]} def test_runtime_graceful_stop_waits_until_worker_finishes() -> None: started = Event() release = Event() runtime = RuntimeManager() runtime.register_module(BlockingModule(started, release)) runtime.start() assert started.wait(timeout=1.0) is True assert runtime.status()["runtime"]["state"] == "busy" stop_thread = Thread(target=runtime.stop, kwargs={"timeout": 1.0}, daemon=True) stop_thread.start() sleep(0.1) assert stop_thread.is_alive() is True release.set() stop_thread.join(timeout=1.0) assert stop_thread.is_alive() is False assert runtime.status()["runtime"]["state"] == "stopped" def test_trace_service_writes_contexts_and_messages() -> None: from app_runtime.tracing.service import TraceService transport = RecordingTransport() manager = TraceService(transport=transport) with manager.open_context(alias="worker", kind="task", attrs={"task": "incoming"}): manager.step("parse") manager.info("started", status="ok", attrs={"attempt": 1}) assert len(transport.contexts) == 1 assert len(transport.messages) == 1 assert transport.contexts[0].alias == "worker" assert transport.messages[0].step == "parse" def test_http_control_channel_exposes_health_and_actions() -> None: from app_runtime.control.base import ControlActionSet from app_runtime.control.http_channel import HttpControlChannel state = {"started": False} async def health(): return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} async def start_handler() -> str: state["started"] = True return "started" async def stop_handler() -> str: state["started"] = False return "stopped" async def status_handler() -> str: return "idle" if state["started"] else "stopped" async def scenario() -> None: channel = HttpControlChannel("127.0.0.1", 0, 2) await channel.start( ControlActionSet( health=health, start=start_handler, stop=stop_handler, status=status_handler, ) ) start_response = await channel._action_response("start") assert start_response.status_code == 200 assert start_response.body == b'{"status":"ok","detail":"started"}' health_payload = await channel._health_response() assert health_payload["status"] == "ok" status_response = await channel._action_response("status") assert status_response.status_code == 200 assert status_response.body == b'{"status":"ok","detail":"idle"}' await channel.stop() asyncio.run(scenario()) def test_public_plba_package_exports_runtime_builder(tmp_path) -> None: from plba import ApplicationModule as PublicApplicationModule from plba import QueueWorker as PublicQueueWorker from plba import create_runtime config_path = tmp_path / "config.yml" config_path.write_text("platform: {}\n", encoding="utf-8") class PublicExampleModule(PublicApplicationModule): @property def name(self) -> str: return "public-example" def register(self, registry: ModuleRegistry) -> None: queue = InMemoryTaskQueue() traces = registry.services.get("traces") handler = CollectingHandler() queue.publish(Task(name="incoming", payload={"id": 2}, metadata={})) registry.add_worker(PublicQueueWorker("public-worker", queue, handler, traces)) runtime = create_runtime(PublicExampleModule(), config_path=str(config_path)) runtime.start() sleep(0.2) assert runtime.configuration.get() == {"platform": {}} assert runtime.status()["workers"]["registered"] == 1 runtime.stop()