Files
config_manager/tests/v2/test_stop_graceful.py

47 lines
1.2 KiB
Python

import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.started_event = threading.Event()
self.active_event = threading.Event()
self.calls = 0
def execute(self) -> None:
self.calls += 1
self.active_event.set()
self.started_event.set()
time.sleep(0.2)
self.active_event.clear()
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0)
assert started is True
await app.stop()
await runner
assert app.active_event.is_set() is False
assert app.calls == 1
await asyncio.sleep(0.15)
assert app.calls == 1
asyncio.run(scenario())