154 lines
4.4 KiB
Python
154 lines
4.4 KiB
Python
import asyncio
|
|
import threading
|
|
import time
|
|
|
|
from config_manager.v2 import ConfigManagerV2
|
|
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
|
|
|
|
|
class DummyControlChannel(ControlChannel):
|
|
def __init__(self):
|
|
self.on_start: StartHandler | None = None
|
|
self.on_stop: StopHandler | None = None
|
|
self.on_status: StatusHandler | None = None
|
|
self.started = False
|
|
self.stopped = False
|
|
|
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
|
self.on_start = on_start
|
|
self.on_stop = on_stop
|
|
self.on_status = on_status
|
|
self.started = True
|
|
|
|
async def stop(self) -> None:
|
|
self.stopped = True
|
|
|
|
|
|
class RestartableApp(ConfigManagerV2):
|
|
DEFAULT_UPDATE_INTERVAL = 0.05
|
|
DEFAULT_WORK_INTERVAL = 0.05
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.calls = 0
|
|
|
|
def execute(self) -> None:
|
|
self.calls += 1
|
|
|
|
|
|
class TimeoutAwareApp(ConfigManagerV2):
|
|
DEFAULT_UPDATE_INTERVAL = 0.05
|
|
DEFAULT_WORK_INTERVAL = 0.02
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.calls = 0
|
|
self.active = 0
|
|
self.max_active = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def execute(self) -> None:
|
|
with self._lock:
|
|
self.calls += 1
|
|
self.active += 1
|
|
self.max_active = max(self.max_active, self.active)
|
|
try:
|
|
time.sleep(0.2)
|
|
finally:
|
|
with self._lock:
|
|
self.active -= 1
|
|
|
|
|
|
class NormalSingleThreadApp(ConfigManagerV2):
|
|
DEFAULT_UPDATE_INTERVAL = 0.05
|
|
DEFAULT_WORK_INTERVAL = 0.02
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.calls = 0
|
|
self.active = 0
|
|
self.max_active = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def execute(self) -> None:
|
|
with self._lock:
|
|
self.calls += 1
|
|
self.active += 1
|
|
self.max_active = max(self.max_active, self.active)
|
|
try:
|
|
time.sleep(0.03)
|
|
finally:
|
|
with self._lock:
|
|
self.active -= 1
|
|
|
|
|
|
def test_control_channel_stop_and_start_resumes_execute(tmp_path):
|
|
async def scenario() -> None:
|
|
cfg = tmp_path / "config.yaml"
|
|
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
|
|
|
channel = DummyControlChannel()
|
|
app = RestartableApp(str(cfg), control_channels=[channel])
|
|
await app.start()
|
|
await asyncio.sleep(0.2)
|
|
before_stop = app.calls
|
|
assert before_stop > 0
|
|
|
|
assert channel.on_stop is not None
|
|
assert channel.on_start is not None
|
|
stop_text = await channel.on_stop()
|
|
assert "stop signal accepted" in stop_text
|
|
|
|
await asyncio.sleep(0.2)
|
|
after_stop = app.calls
|
|
assert after_stop == before_stop
|
|
|
|
start_text = await channel.on_start()
|
|
assert "start signal accepted" in start_text
|
|
await asyncio.sleep(0.2)
|
|
assert app.calls > after_stop
|
|
|
|
await app.stop()
|
|
assert channel.stopped is True
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_normal_mode_uses_single_inflight_execute(tmp_path):
|
|
async def scenario() -> None:
|
|
cfg = tmp_path / "config.yaml"
|
|
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
|
|
|
app = NormalSingleThreadApp(str(cfg))
|
|
await app.start()
|
|
await asyncio.sleep(0.25)
|
|
health = await app.get_health_provider()()
|
|
await app.stop()
|
|
|
|
assert app.calls >= 2
|
|
assert app.max_active == 1
|
|
assert health["status"] == "ok"
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
|
|
async def scenario() -> None:
|
|
cfg = tmp_path / "config.yaml"
|
|
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
|
|
|
monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05")
|
|
app = TimeoutAwareApp(str(cfg))
|
|
await app.start()
|
|
await asyncio.sleep(0.35)
|
|
degraded_health = await app.get_health_provider()()
|
|
await app.stop()
|
|
|
|
assert app.calls >= 1
|
|
assert app._last_execute_error is not None
|
|
assert "did not finish within" in app._last_execute_error
|
|
assert app.max_active == 2
|
|
assert degraded_health["status"] == "degraded"
|
|
|
|
asyncio.run(scenario())
|