from __future__ import annotations from dataclasses import dataclass, field from threading import Event, Lock, Thread from time import monotonic, sleep from fastapi.testclient import TestClient from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus from app_runtime.control.base import ControlActionSet from app_runtime.control.http_channel import HttpControlChannel from app_runtime.core.registration import ModuleRegistry from app_runtime.core.runtime import RuntimeManager @dataclass class IntervalRoutine: calls: list[float] = field(default_factory=list) _lock: Lock = field(default_factory=Lock) def run(self) -> None: with self._lock: self.calls.append(monotonic()) def wait_runs(self, count: int, timeout: float) -> bool: deadline = monotonic() + timeout while monotonic() < deadline: with self._lock: if len(self.calls) >= count: return True sleep(0.01) return False def deltas(self) -> list[float]: with self._lock: return [self.calls[index + 1] - self.calls[index] for index in range(len(self.calls) - 1)] class BlockingRoutine: def __init__(self, started: Event, release: Event) -> None: self._started = started self._release = release def run(self) -> None: self._started.set() self._release.wait(timeout=5.0) class ScenarioWorker(Worker): def __init__(self, name: str, routine: object, *, interval: float = 0.05) -> None: self._name = name self._routine = routine self._interval = interval self._thread: Thread | None = None self._stop_requested = Event() self._in_flight = 0 self._runs = 0 self._lock = Lock() @property def name(self) -> str: return self._name @property def critical(self) -> bool: return True def start(self) -> None: if self._thread is not None and self._thread.is_alive(): return self._stop_requested.clear() self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True) self._thread.start() def stop(self, force: bool = False) -> None: self._stop_requested.set() def health(self) -> WorkerHealth: return WorkerHealth(name=self.name, status="ok", critical=True, meta={"runs": self._runs}) def status(self) -> WorkerStatus: thread_alive = self._thread is not None and self._thread.is_alive() with self._lock: in_flight = self._in_flight runs = self._runs if not thread_alive: state = "stopped" elif self._stop_requested.is_set(): state = "stopping" elif in_flight > 0: state = "busy" else: state = "idle" return WorkerStatus(name=self.name, state=state, in_flight=in_flight, meta={"runs": runs}) def _loop(self) -> None: while not self._stop_requested.is_set(): with self._lock: self._in_flight += 1 try: self._routine.run() with self._lock: self._runs += 1 finally: with self._lock: self._in_flight -= 1 if self._stop_requested.is_set(): return sleep(self._interval) class ForceAwareWorker(Worker): def __init__(self, name: str, started: Event, release: Event) -> None: self._name = name self._started = started self._release = release self._thread: Thread | None = None self._stop_requested = Event() self._force_stop = Event() self.stop_calls: list[bool] = [] @property def name(self) -> str: return self._name @property def critical(self) -> bool: return True def start(self) -> None: if self._thread is not None and self._thread.is_alive(): return self._stop_requested.clear() self._force_stop.clear() self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True) self._thread.start() def stop(self, force: bool = False) -> None: self.stop_calls.append(force) self._stop_requested.set() if force: self._force_stop.set() def health(self) -> WorkerHealth: return WorkerHealth(name=self.name, status="ok", critical=True) def status(self) -> WorkerStatus: thread_alive = self._thread is not None and self._thread.is_alive() if not thread_alive: state = "stopped" elif self._stop_requested.is_set(): state = "stopping" else: state = "busy" return WorkerStatus(name=self.name, state=state, in_flight=1 if thread_alive else 0) def _loop(self) -> None: self._started.set() while not self._force_stop.is_set(): if self._release.wait(timeout=0.05): return class WorkerModule(ApplicationModule): def __init__(self, worker: Worker) -> None: self._worker = worker @property def name(self) -> str: return "business-tests" def register(self, registry: ModuleRegistry) -> None: registry.add_worker(self._worker) def _http_call_json( client: TestClient, path: str, *, method: str = "GET", headers: dict[str, str] | None = None, ) -> tuple[int, dict[str, object]]: response = client.request(method, path, headers=headers or {}) return response.status_code, response.json() def _poll_until_stopped(client: TestClient, timeout: float = 2.0) -> bool: deadline = monotonic() + timeout while monotonic() < deadline: _, payload = _http_call_json(client, "/actions/status") if payload.get("detail") == "stopped": return True sleep(0.05) return False def _build_runtime(worker: Worker, *, control_timeout: int = 1) -> tuple[RuntimeManager, TestClient]: runtime = RuntimeManager() runtime.register_module(WorkerModule(worker)) runtime.start(start_control_plane=False) channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=control_timeout) channel._actions = ControlActionSet( health=runtime.health_status, start=runtime.start_runtime, stop=runtime.stop_runtime, status=runtime.runtime_status, ) app = channel._factory.create(channel._health_response, channel._action_response) return runtime, TestClient(app) def test_worker_wakes_up_with_configured_interval() -> None: interval = 0.15 routine = IntervalRoutine() worker = ScenarioWorker("interval-worker", routine, interval=interval) runtime = RuntimeManager() runtime.register_module(WorkerModule(worker)) runtime.start() try: assert routine.wait_runs(count=3, timeout=2.0) is True finally: runtime.stop() deltas = routine.deltas() assert len(deltas) >= 2 assert all(delta >= interval * 0.7 for delta in deltas[:2]) def test_actions_start_stop_and_health_when_worker_is_idle() -> None: runtime, client = _build_runtime(ScenarioWorker("idle-worker", IntervalRoutine(), interval=0.2)) try: stop_status, stop_payload = _http_call_json(client, "/actions/stop", method="POST") assert stop_status == 200 assert isinstance(stop_payload.get("detail"), dict) assert stop_payload["detail"]["timed_out"] is False assert stop_payload["detail"]["state"] == "stopped" health_stopped_status, health_stopped_payload = _http_call_json(client, "/health") assert health_stopped_status == 503 assert health_stopped_payload["state"] == "stopped" assert health_stopped_payload["status"] == "unhealthy" start_status, start_payload = _http_call_json(client, "/actions/start", method="POST") assert start_status == 200 assert isinstance(start_payload.get("detail"), dict) assert start_payload["detail"]["timed_out"] is False assert start_payload["detail"]["state"] in {"idle", "busy"} health_started_status, health_started_payload = _http_call_json(client, "/health") assert health_started_status == 200 assert health_started_payload["status"] == "ok" assert health_started_payload["state"] in {"idle", "busy"} finally: client.close() runtime.stop() def test_actions_stop_busy_worker_before_timeout() -> None: started = Event() release = Event() runtime, client = _build_runtime(ScenarioWorker("busy-worker", BlockingRoutine(started, release))) assert started.wait(timeout=1.0) is True def release_later() -> None: sleep(0.2) release.set() Thread(target=release_later, daemon=True).start() try: status, payload = _http_call_json(client, "/actions/stop", method="POST") assert status == 200 assert isinstance(payload.get("detail"), dict) assert payload["detail"]["timed_out"] is False assert payload["detail"]["state"] == "stopped" health_status, health_payload = _http_call_json(client, "/health") assert health_status == 503 assert health_payload["status"] == "unhealthy" assert health_payload["state"] == "stopped" finally: client.close() runtime.stop() def test_actions_stop_busy_worker_after_timeout() -> None: started = Event() release = Event() runtime, client = _build_runtime(ScenarioWorker("slow-stop-worker", BlockingRoutine(started, release))) runtime.ACTION_TIMEOUT_SECONDS = 0.3 assert started.wait(timeout=1.0) is True try: status, payload = _http_call_json(client, "/actions/stop", method="POST") assert status == 200 assert isinstance(payload.get("detail"), dict) assert payload["detail"]["timed_out"] is True assert payload["detail"]["state"] == "stopping" health_status, health_payload = _http_call_json(client, "/health") assert health_status == 503 assert health_payload["status"] == "degraded" assert health_payload["state"] == "stopping" release.set() assert _poll_until_stopped(client, timeout=2.0) is True stopped_health_status, stopped_health_payload = _http_call_json(client, "/health") assert stopped_health_status == 503 assert stopped_health_payload["state"] == "stopped" finally: client.close() runtime.stop() def test_actions_stop_wait_false_returns_before_worker_finishes() -> None: started = Event() release = Event() runtime, client = _build_runtime(ScenarioWorker("async-stop-worker", BlockingRoutine(started, release))) assert started.wait(timeout=1.0) is True try: status, payload = _http_call_json(client, "/actions/stop?wait=false", method="POST") assert status == 200 assert isinstance(payload.get("detail"), dict) assert payload["detail"]["timed_out"] is False assert payload["detail"]["state"] == "stopping" health_status, health_payload = _http_call_json(client, "/health") assert health_status == 503 assert health_payload["status"] == "degraded" assert health_payload["state"] == "stopping" release.set() assert _poll_until_stopped(client, timeout=2.0) is True finally: client.close() runtime.stop() def test_actions_stop_honors_timeout_query_parameter() -> None: started = Event() release = Event() runtime, client = _build_runtime(ScenarioWorker("timeout-stop-worker", BlockingRoutine(started, release))) assert started.wait(timeout=1.0) is True try: status, payload = _http_call_json(client, "/actions/stop?timeout=0.1", method="POST") assert status == 200 assert isinstance(payload.get("detail"), dict) assert payload["detail"]["timed_out"] is True assert payload["detail"]["state"] == "stopping" release.set() assert _poll_until_stopped(client, timeout=2.0) is True finally: client.close() runtime.stop() def test_actions_stop_honors_force_query_parameter() -> None: started = Event() release = Event() worker = ForceAwareWorker("force-stop-worker", started, release) runtime, client = _build_runtime(worker) assert started.wait(timeout=1.0) is True try: status, payload = _http_call_json(client, "/actions/stop?force=true&timeout=0.5", method="POST") assert status == 200 assert isinstance(payload.get("detail"), dict) assert payload["detail"]["timed_out"] is False assert payload["detail"]["state"] == "stopped" assert worker.stop_calls == [True] finally: release.set() client.close() runtime.stop() def test_actions_log_client_source_for_start_and_stop(caplog) -> None: runtime, client = _build_runtime(ScenarioWorker("log-worker", IntervalRoutine(), interval=0.2)) caplog.set_level("WARNING", logger="app_runtime.control.http_app") try: _http_call_json(client, "/actions/stop", method="POST", headers={"X-Client-Source": "web-ui"}) _http_call_json(client, "/actions/start", method="POST", headers={"X-Client-Source": "web-ui"}) finally: client.close() runtime.stop() messages = [record.getMessage() for record in caplog.records] assert any("/actions/stop client=web-ui" in message for message in messages) assert any("/actions/start client=web-ui" in message for message in messages)