Files
config_manager/tests/v2/test_control_channel.py

61 lines
1.8 KiB
Python

import asyncio
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class ControlledApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def execute(self) -> None:
return
def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel()
app = ControlledApp(str(cfg), control_channels=[channel])
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert channel.started is True
assert channel.on_status is not None
assert channel.on_stop is not None
status_text = await channel.on_status()
assert "state=running" in status_text
assert "worker_inflight=" in status_text
assert "worker_timed_out_inflight=" in status_text
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await runner
await app.stop()
assert channel.stopped is True
asyncio.run(scenario())