import asyncio from config_manager.v2 import ConfigManagerV2, ManagementServerSettings from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler class DummyControlChannel(ControlChannel): def __init__(self): self.on_start: StartHandler | None = None self.on_stop: StopHandler | None = None self.on_status: StatusHandler | None = None self.started = False self.stopped = False async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: self.on_start = on_start self.on_stop = on_stop self.on_status = on_status self.started = True async def stop(self) -> None: self.stopped = True class ControlledApp(ConfigManagerV2): DEFAULT_UPDATE_INTERVAL = 0.05 DEFAULT_WORK_INTERVAL = 0.05 def execute(self) -> None: return def test_control_channel_can_stop_manager(tmp_path): async def scenario() -> None: cfg = tmp_path / "config.yaml" cfg.write_text("log: {}\n", encoding="utf-8") channel = DummyControlChannel() app = ControlledApp( str(cfg), control_channel=channel, management_settings=ManagementServerSettings(enabled=False), ) runner = asyncio.create_task(app.start()) await asyncio.sleep(0.12) assert channel.started is True assert channel.on_status is not None assert channel.on_stop is not None status_text = await channel.on_status() assert "state=running" in status_text stop_text = await channel.on_stop() assert "stop signal accepted" in stop_text await runner # Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным) asyncio.run(scenario())