ConfigManager V2
This commit is contained in:
54
tests/v2/test_control_channel.py
Normal file
54
tests/v2/test_control_channel.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import asyncio
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||
|
||||
|
||||
class DummyControlChannel(ControlChannel):
|
||||
def __init__(self):
|
||||
self.on_start: StartHandler | None = None
|
||||
self.on_stop: StopHandler | None = None
|
||||
self.on_status: StatusHandler | None = None
|
||||
self.started = False
|
||||
self.stopped = False
|
||||
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
self.on_start = on_start
|
||||
self.on_stop = on_stop
|
||||
self.on_status = on_status
|
||||
self.started = True
|
||||
|
||||
async def stop(self) -> None:
|
||||
self.stopped = True
|
||||
|
||||
|
||||
class ControlledApp(ConfigManagerV2):
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
|
||||
def test_control_channel_can_stop_manager(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
|
||||
channel = DummyControlChannel()
|
||||
app = ControlledApp(str(cfg), control_channel=channel)
|
||||
|
||||
runner = asyncio.create_task(app.start())
|
||||
await asyncio.sleep(0.12)
|
||||
|
||||
assert channel.started is True
|
||||
assert channel.on_status is not None
|
||||
assert channel.on_stop is not None
|
||||
|
||||
status_text = await channel.on_status()
|
||||
assert "state=running" in status_text
|
||||
|
||||
stop_text = await channel.on_stop()
|
||||
assert "stop signal accepted" in stop_text
|
||||
|
||||
await runner
|
||||
assert channel.stopped is True
|
||||
|
||||
asyncio.run(scenario())
|
||||
Reference in New Issue
Block a user