372 lines
12 KiB
Python
372 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass, field
|
|
from threading import Event, Lock, Thread
|
|
from time import sleep
|
|
|
|
from app_runtime.contracts.application import ApplicationModule
|
|
from app_runtime.contracts.health import HealthContributor
|
|
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
|
|
from app_runtime.core.registration import ModuleRegistry
|
|
from app_runtime.core.runtime import RuntimeManager
|
|
from app_runtime.queue.in_memory import InMemoryTaskQueue
|
|
from app_runtime.tracing.transport import NoOpTraceTransport
|
|
|
|
|
|
@dataclass
|
|
class CollectingRoutine:
|
|
processed: list[dict[str, object]] = field(default_factory=list)
|
|
_done: bool = False
|
|
|
|
def run(self) -> None:
|
|
if self._done:
|
|
return
|
|
self.processed.append({"id": 1})
|
|
self._done = True
|
|
|
|
|
|
class BlockingRoutine:
|
|
def __init__(self, started: Event, release: Event) -> None:
|
|
self._started = started
|
|
self._release = release
|
|
|
|
def run(self) -> None:
|
|
self._started.set()
|
|
self._release.wait(timeout=2.0)
|
|
|
|
|
|
class StaticHealthContributor(HealthContributor):
|
|
def health(self) -> WorkerHealth:
|
|
return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"})
|
|
|
|
|
|
class RoutineWorker(Worker):
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
routine: object,
|
|
*,
|
|
interval: float = 0.01,
|
|
concurrency: int = 1,
|
|
critical: bool = True,
|
|
) -> None:
|
|
self._name = name
|
|
self._routine = routine
|
|
self._interval = interval
|
|
self._concurrency = concurrency
|
|
self._critical = critical
|
|
self._threads: list[Thread] = []
|
|
self._stop_requested = Event()
|
|
self._force_stop = Event()
|
|
self._lock = Lock()
|
|
self._started = False
|
|
self._in_flight = 0
|
|
self._runs = 0
|
|
self._failures = 0
|
|
self._last_error: str | None = None
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def critical(self) -> bool:
|
|
return self._critical
|
|
|
|
def start(self) -> None:
|
|
if any(thread.is_alive() for thread in self._threads):
|
|
return
|
|
self._threads.clear()
|
|
self._stop_requested.clear()
|
|
self._force_stop.clear()
|
|
self._started = True
|
|
for index in range(self._concurrency):
|
|
thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True)
|
|
self._threads.append(thread)
|
|
thread.start()
|
|
|
|
def stop(self, force: bool = False) -> None:
|
|
self._stop_requested.set()
|
|
if force:
|
|
self._force_stop.set()
|
|
|
|
def health(self) -> WorkerHealth:
|
|
status = self.status()
|
|
if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0:
|
|
return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta)
|
|
if self._failures > 0:
|
|
return WorkerHealth(self.name, "degraded", self.critical, self._last_error, status.meta)
|
|
return WorkerHealth(self.name, "ok", self.critical, meta=status.meta)
|
|
|
|
def status(self) -> WorkerStatus:
|
|
alive_threads = self._alive_threads()
|
|
with self._lock:
|
|
in_flight = self._in_flight
|
|
runs = self._runs
|
|
failures = self._failures
|
|
detail = self._last_error
|
|
if self._started and alive_threads == 0:
|
|
state = "stopped"
|
|
elif self._stop_requested.is_set():
|
|
state = "stopping" if alive_threads > 0 else "stopped"
|
|
elif not self._started:
|
|
state = "stopped"
|
|
elif in_flight > 0:
|
|
state = "busy"
|
|
else:
|
|
state = "idle"
|
|
return WorkerStatus(
|
|
name=self.name,
|
|
state=state,
|
|
in_flight=in_flight,
|
|
detail=detail,
|
|
meta={"alive_threads": alive_threads, "concurrency": self._concurrency, "runs": runs, "failures": failures},
|
|
)
|
|
|
|
def _run_loop(self) -> None:
|
|
while True:
|
|
if self._force_stop.is_set() or self._stop_requested.is_set():
|
|
return
|
|
with self._lock:
|
|
self._in_flight += 1
|
|
try:
|
|
self._routine.run()
|
|
except Exception as exc:
|
|
with self._lock:
|
|
self._failures += 1
|
|
self._last_error = str(exc)
|
|
else:
|
|
with self._lock:
|
|
self._runs += 1
|
|
self._last_error = None
|
|
finally:
|
|
with self._lock:
|
|
self._in_flight -= 1
|
|
if self._stop_requested.is_set():
|
|
return
|
|
sleep(self._interval)
|
|
|
|
def _alive_threads(self) -> int:
|
|
return sum(1 for thread in self._threads if thread.is_alive())
|
|
|
|
|
|
class ExampleModule(ApplicationModule):
|
|
def __init__(self) -> None:
|
|
self.routine = CollectingRoutine()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "example"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
registry.add_worker(RoutineWorker("collector", self.routine))
|
|
registry.add_health_contributor(StaticHealthContributor())
|
|
|
|
|
|
class BlockingModule(ApplicationModule):
|
|
def __init__(self, started: Event, release: Event) -> None:
|
|
self.routine = BlockingRoutine(started, release)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "blocking"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
registry.add_worker(RoutineWorker("blocking-worker", self.routine))
|
|
|
|
|
|
class RecordingTransport(NoOpTraceTransport):
|
|
def __init__(self) -> None:
|
|
self.contexts: list[object] = []
|
|
self.messages: list[object] = []
|
|
|
|
def write_context(self, record) -> None: # type: ignore[override]
|
|
self.contexts.append(record)
|
|
|
|
def write_message(self, record) -> None: # type: ignore[override]
|
|
self.messages.append(record)
|
|
|
|
|
|
def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None:
|
|
config_path = tmp_path / "config.yml"
|
|
config_path.write_text(
|
|
"""
|
|
platform:
|
|
workers: 1
|
|
log:
|
|
version: 1
|
|
disable_existing_loggers: false
|
|
handlers:
|
|
console:
|
|
class: logging.StreamHandler
|
|
root:
|
|
level: INFO
|
|
handlers: [console]
|
|
""".strip(),
|
|
encoding="utf-8",
|
|
)
|
|
runtime = RuntimeManager()
|
|
runtime.add_config_file(config_path)
|
|
module = ExampleModule()
|
|
runtime.register_module(module)
|
|
|
|
runtime.start()
|
|
sleep(0.2)
|
|
status = runtime.status()
|
|
runtime.stop()
|
|
|
|
assert module.routine.processed == [{"id": 1}]
|
|
assert status["modules"] == ["example"]
|
|
assert status["runtime"]["state"] == "idle"
|
|
assert status["health"]["status"] == "ok"
|
|
assert status["config"] == {"platform": {"workers": 1}, "log": status["config"]["log"]}
|
|
|
|
|
|
def test_runtime_graceful_stop_waits_until_worker_finishes() -> None:
|
|
started = Event()
|
|
release = Event()
|
|
runtime = RuntimeManager()
|
|
runtime.register_module(BlockingModule(started, release))
|
|
runtime.start()
|
|
assert started.wait(timeout=1.0) is True
|
|
assert runtime.status()["runtime"]["state"] == "busy"
|
|
|
|
stop_thread = Thread(target=runtime.stop, kwargs={"timeout": 1.0}, daemon=True)
|
|
stop_thread.start()
|
|
sleep(0.1)
|
|
assert stop_thread.is_alive() is True
|
|
|
|
release.set()
|
|
stop_thread.join(timeout=1.0)
|
|
assert stop_thread.is_alive() is False
|
|
assert runtime.status()["runtime"]["state"] == "stopped"
|
|
|
|
|
|
def test_trace_service_writes_contexts_and_messages() -> None:
|
|
from app_runtime.tracing.service import TraceService
|
|
|
|
transport = RecordingTransport()
|
|
manager = TraceService(transport=transport)
|
|
|
|
with manager.open_context(alias="worker", kind="worker", attrs={"routine": "incoming"}):
|
|
manager.step("parse")
|
|
manager.info("started", status="ok", attrs={"attempt": 1})
|
|
|
|
assert len(transport.contexts) == 1
|
|
assert len(transport.messages) == 1
|
|
assert transport.contexts[0].alias == "worker"
|
|
assert transport.messages[0].step == "parse"
|
|
|
|
|
|
def test_http_control_channel_exposes_health_and_actions() -> None:
|
|
from app_runtime.control.base import ControlActionSet
|
|
from app_runtime.control.http_channel import HttpControlChannel
|
|
|
|
state = {"started": False}
|
|
|
|
async def health():
|
|
return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"}
|
|
|
|
async def start_handler() -> str:
|
|
state["started"] = True
|
|
return "started"
|
|
|
|
async def stop_handler() -> str:
|
|
state["started"] = False
|
|
return "stopped"
|
|
|
|
async def status_handler() -> str:
|
|
return "idle" if state["started"] else "stopped"
|
|
|
|
async def scenario() -> None:
|
|
channel = HttpControlChannel("127.0.0.1", 0, 2)
|
|
await channel.start(
|
|
ControlActionSet(
|
|
health=health,
|
|
start=start_handler,
|
|
stop=stop_handler,
|
|
status=status_handler,
|
|
)
|
|
)
|
|
|
|
start_response = await channel._action_response("start")
|
|
assert start_response.status_code == 200
|
|
assert start_response.body == b'{"status":"ok","detail":"started"}'
|
|
|
|
health_payload = await channel._health_response()
|
|
assert health_payload["status"] == "ok"
|
|
|
|
status_response = await channel._action_response("status")
|
|
assert status_response.status_code == 200
|
|
assert status_response.body == b'{"status":"ok","detail":"idle"}'
|
|
await channel.stop()
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None:
|
|
import plba
|
|
from plba import ApplicationModule as PublicApplicationModule
|
|
from plba import InMemoryTaskQueue
|
|
from plba import Worker as PublicWorker
|
|
from plba import WorkerHealth as PublicWorkerHealth
|
|
from plba import WorkerStatus as PublicWorkerStatus
|
|
from plba import create_runtime
|
|
|
|
config_path = tmp_path / "config.yml"
|
|
config_path.write_text("platform: {}\n", encoding="utf-8")
|
|
|
|
queue = InMemoryTaskQueue[int]()
|
|
queue.put(2)
|
|
assert queue.get(timeout=0.01) == 2
|
|
queue.task_done()
|
|
|
|
class PublicRoutine:
|
|
def __init__(self) -> None:
|
|
self.runs = 0
|
|
|
|
def run(self) -> None:
|
|
if self.runs == 0:
|
|
self.runs += 1
|
|
|
|
class PublicWorkerImpl(PublicWorker):
|
|
def __init__(self, routine: PublicRoutine) -> None:
|
|
self._inner = RoutineWorker("public-worker", routine)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._inner.name
|
|
|
|
@property
|
|
def critical(self) -> bool:
|
|
return self._inner.critical
|
|
|
|
def start(self) -> None:
|
|
self._inner.start()
|
|
|
|
def stop(self, force: bool = False) -> None:
|
|
self._inner.stop(force=force)
|
|
|
|
def health(self) -> PublicWorkerHealth:
|
|
return self._inner.health()
|
|
|
|
def status(self) -> PublicWorkerStatus:
|
|
return self._inner.status()
|
|
|
|
class PublicExampleModule(PublicApplicationModule):
|
|
@property
|
|
def name(self) -> str:
|
|
return "public-example"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
registry.add_worker(PublicWorkerImpl(PublicRoutine()))
|
|
|
|
runtime = create_runtime(PublicExampleModule(), config_path=str(config_path))
|
|
runtime.start()
|
|
sleep(0.2)
|
|
assert runtime.configuration.get() == {"platform": {}}
|
|
assert runtime.status()["workers"]["registered"] == 1
|
|
assert hasattr(plba, "QueueWorker") is False
|
|
runtime.stop()
|