231 lines
7.6 KiB
Python
231 lines
7.6 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass, field
|
|
from threading import Event, Thread
|
|
from time import sleep
|
|
|
|
from app_runtime.contracts.application import ApplicationModule
|
|
from app_runtime.contracts.health import HealthContributor
|
|
from app_runtime.contracts.tasks import Task, TaskHandler
|
|
from app_runtime.contracts.worker import WorkerHealth
|
|
from app_runtime.core.registration import ModuleRegistry
|
|
from app_runtime.core.runtime import RuntimeManager
|
|
from app_runtime.queue.in_memory import InMemoryTaskQueue
|
|
from app_runtime.tracing.transport import NoOpTraceTransport
|
|
from app_runtime.workers.queue_worker import QueueWorker
|
|
|
|
|
|
@dataclass
|
|
class CollectingHandler(TaskHandler):
|
|
processed: list[dict[str, object]] = field(default_factory=list)
|
|
|
|
def handle(self, task: Task) -> None:
|
|
self.processed.append(task.payload)
|
|
|
|
|
|
class BlockingHandler(TaskHandler):
|
|
def __init__(self, started: Event, release: Event) -> None:
|
|
self._started = started
|
|
self._release = release
|
|
|
|
def handle(self, task: Task) -> None:
|
|
del task
|
|
self._started.set()
|
|
self._release.wait(timeout=2.0)
|
|
|
|
|
|
class StaticHealthContributor(HealthContributor):
|
|
def health(self) -> WorkerHealth:
|
|
return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"})
|
|
|
|
|
|
class ExampleModule(ApplicationModule):
|
|
def __init__(self) -> None:
|
|
self.handler = CollectingHandler()
|
|
self.queue = InMemoryTaskQueue()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "example"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
traces = registry.services.get("traces")
|
|
registry.add_queue("incoming", self.queue)
|
|
registry.add_handler("collect", self.handler)
|
|
self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={}))
|
|
registry.add_worker(QueueWorker("collector", self.queue, self.handler, traces, concurrency=1))
|
|
registry.add_health_contributor(StaticHealthContributor())
|
|
|
|
|
|
class BlockingModule(ApplicationModule):
|
|
def __init__(self, started: Event, release: Event) -> None:
|
|
self.queue = InMemoryTaskQueue()
|
|
self.handler = BlockingHandler(started, release)
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "blocking"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
traces = registry.services.get("traces")
|
|
self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={}))
|
|
registry.add_worker(QueueWorker("blocking-worker", self.queue, self.handler, traces, concurrency=1))
|
|
|
|
|
|
class RecordingTransport(NoOpTraceTransport):
|
|
def __init__(self) -> None:
|
|
self.contexts: list[object] = []
|
|
self.messages: list[object] = []
|
|
|
|
def write_context(self, record) -> None: # type: ignore[override]
|
|
self.contexts.append(record)
|
|
|
|
def write_message(self, record) -> None: # type: ignore[override]
|
|
self.messages.append(record)
|
|
|
|
|
|
def test_runtime_processes_tasks_and_exposes_status(tmp_path) -> None:
|
|
config_path = tmp_path / "config.yml"
|
|
config_path.write_text(
|
|
"""
|
|
platform:
|
|
workers: 1
|
|
log:
|
|
version: 1
|
|
disable_existing_loggers: false
|
|
handlers:
|
|
console:
|
|
class: logging.StreamHandler
|
|
root:
|
|
level: INFO
|
|
handlers: [console]
|
|
""".strip(),
|
|
encoding="utf-8",
|
|
)
|
|
runtime = RuntimeManager()
|
|
runtime.add_config_file(config_path)
|
|
module = ExampleModule()
|
|
runtime.register_module(module)
|
|
|
|
runtime.start()
|
|
sleep(0.2)
|
|
status = runtime.status()
|
|
runtime.stop()
|
|
|
|
assert module.handler.processed == [{"id": 1}]
|
|
assert status["modules"] == ["example"]
|
|
assert status["runtime"]["state"] == "idle"
|
|
assert status["health"]["status"] == "ok"
|
|
assert status["config"] == {"platform": {"workers": 1}, "log": status["config"]["log"]}
|
|
|
|
|
|
def test_runtime_graceful_stop_waits_until_worker_finishes() -> None:
|
|
started = Event()
|
|
release = Event()
|
|
runtime = RuntimeManager()
|
|
runtime.register_module(BlockingModule(started, release))
|
|
runtime.start()
|
|
assert started.wait(timeout=1.0) is True
|
|
assert runtime.status()["runtime"]["state"] == "busy"
|
|
|
|
stop_thread = Thread(target=runtime.stop, kwargs={"timeout": 1.0}, daemon=True)
|
|
stop_thread.start()
|
|
sleep(0.1)
|
|
assert stop_thread.is_alive() is True
|
|
|
|
release.set()
|
|
stop_thread.join(timeout=1.0)
|
|
assert stop_thread.is_alive() is False
|
|
assert runtime.status()["runtime"]["state"] == "stopped"
|
|
|
|
|
|
def test_trace_service_writes_contexts_and_messages() -> None:
|
|
from app_runtime.tracing.service import TraceService
|
|
|
|
transport = RecordingTransport()
|
|
manager = TraceService(transport=transport)
|
|
|
|
with manager.open_context(alias="worker", kind="task", attrs={"task": "incoming"}):
|
|
manager.step("parse")
|
|
manager.info("started", status="ok", attrs={"attempt": 1})
|
|
|
|
assert len(transport.contexts) == 1
|
|
assert len(transport.messages) == 1
|
|
assert transport.contexts[0].alias == "worker"
|
|
assert transport.messages[0].step == "parse"
|
|
|
|
|
|
def test_http_control_channel_exposes_health_and_actions() -> None:
|
|
from app_runtime.control.base import ControlActionSet
|
|
from app_runtime.control.http_channel import HttpControlChannel
|
|
|
|
state = {"started": False}
|
|
|
|
async def health():
|
|
return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"}
|
|
|
|
async def start_handler() -> str:
|
|
state["started"] = True
|
|
return "started"
|
|
|
|
async def stop_handler() -> str:
|
|
state["started"] = False
|
|
return "stopped"
|
|
|
|
async def status_handler() -> str:
|
|
return "idle" if state["started"] else "stopped"
|
|
|
|
async def scenario() -> None:
|
|
channel = HttpControlChannel("127.0.0.1", 0, 2)
|
|
await channel.start(
|
|
ControlActionSet(
|
|
health=health,
|
|
start=start_handler,
|
|
stop=stop_handler,
|
|
status=status_handler,
|
|
)
|
|
)
|
|
|
|
start_response = await channel._action_response("start")
|
|
assert start_response.status_code == 200
|
|
assert start_response.body == b'{"status":"ok","detail":"started"}'
|
|
|
|
health_payload = await channel._health_response()
|
|
assert health_payload["status"] == "ok"
|
|
|
|
status_response = await channel._action_response("status")
|
|
assert status_response.status_code == 200
|
|
assert status_response.body == b'{"status":"ok","detail":"idle"}'
|
|
await channel.stop()
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_public_plba_package_exports_runtime_builder(tmp_path) -> None:
|
|
from plba import ApplicationModule as PublicApplicationModule
|
|
from plba import QueueWorker as PublicQueueWorker
|
|
from plba import create_runtime
|
|
|
|
config_path = tmp_path / "config.yml"
|
|
config_path.write_text("platform: {}\n", encoding="utf-8")
|
|
|
|
class PublicExampleModule(PublicApplicationModule):
|
|
@property
|
|
def name(self) -> str:
|
|
return "public-example"
|
|
|
|
def register(self, registry: ModuleRegistry) -> None:
|
|
queue = InMemoryTaskQueue()
|
|
traces = registry.services.get("traces")
|
|
handler = CollectingHandler()
|
|
queue.publish(Task(name="incoming", payload={"id": 2}, metadata={}))
|
|
registry.add_worker(PublicQueueWorker("public-worker", queue, handler, traces))
|
|
|
|
runtime = create_runtime(PublicExampleModule(), config_path=str(config_path))
|
|
runtime.start()
|
|
sleep(0.2)
|
|
assert runtime.configuration.get() == {"platform": {}}
|
|
assert runtime.status()["workers"]["registered"] == 1
|
|
runtime.stop()
|