62 lines
1.9 KiB
Python
62 lines
1.9 KiB
Python
import asyncio
|
|
|
|
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
|
|
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
|
|
|
|
|
class DummyControlChannel(ControlChannel):
|
|
def __init__(self):
|
|
self.on_start: StartHandler | None = None
|
|
self.on_stop: StopHandler | None = None
|
|
self.on_status: StatusHandler | None = None
|
|
self.started = False
|
|
self.stopped = False
|
|
|
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
|
self.on_start = on_start
|
|
self.on_stop = on_stop
|
|
self.on_status = on_status
|
|
self.started = True
|
|
|
|
async def stop(self) -> None:
|
|
self.stopped = True
|
|
|
|
|
|
class ControlledApp(ConfigManagerV2):
|
|
DEFAULT_UPDATE_INTERVAL = 0.05
|
|
DEFAULT_WORK_INTERVAL = 0.05
|
|
|
|
def execute(self) -> None:
|
|
return
|
|
|
|
|
|
def test_control_channel_can_stop_manager(tmp_path):
|
|
async def scenario() -> None:
|
|
cfg = tmp_path / "config.yaml"
|
|
cfg.write_text("log: {}\n", encoding="utf-8")
|
|
|
|
channel = DummyControlChannel()
|
|
app = ControlledApp(
|
|
str(cfg),
|
|
control_channel=channel,
|
|
management_settings=ManagementServerSettings(enabled=False),
|
|
)
|
|
|
|
runner = asyncio.create_task(app.start())
|
|
await asyncio.sleep(0.12)
|
|
|
|
assert channel.started is True
|
|
assert channel.on_status is not None
|
|
assert channel.on_stop is not None
|
|
|
|
status_text = await channel.on_status()
|
|
assert "state=running" in status_text
|
|
|
|
stop_text = await channel.on_stop()
|
|
assert "stop signal accepted" in stop_text
|
|
|
|
await runner
|
|
# Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным)
|
|
|
|
asyncio.run(scenario())
|