From 8e023d304fd672c93c530a4293553b7e8502e5b3 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Thu, 26 Feb 2026 22:54:53 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=BE=D0=BF=D1=80=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=20=D0=BF=D0=B5=D1=80=D0=B5=D1=85=D0=BE=D0=B4=20?= =?UTF-8?q?=D0=B2=20=D0=9E=D0=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config_manager/v2/core/manager.py | 10 ++++++++++ src/config_manager/v2/core/scheduler.py | 10 +++++++++- tests/v2/test_runtime_resilience.py | 2 -- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py index 4b77832..497ceb3 100644 --- a/src/config_manager/v2/core/manager.py +++ b/src/config_manager/v2/core/manager.py @@ -81,9 +81,17 @@ class _RuntimeController: CONTROL_CHANNEL_TIMEOUT = 5.0 + def _trigger_health_transition_check(self) -> None: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + loop.create_task(self._log_health_status_transition(), name="health-transition-check") + def _on_execute_success(self) -> None: self._last_success_timestamp = time.monotonic() self._last_execute_error = None + self._trigger_health_transition_check() self.logger.debug( "ConfigManagerV2._on_execute_success result: last_success_timestamp=%s", self._last_success_timestamp, @@ -91,6 +99,7 @@ class _RuntimeController: def _on_execute_error(self, exc: Exception) -> None: self._last_execute_error = str(exc) + self._trigger_health_transition_check() self.logger.error( "ConfigManagerV2._on_execute_error: %s", self._last_execute_error, @@ -99,6 +108,7 @@ class _RuntimeController: def _on_worker_degraded_change(self, degraded: bool) -> None: self._worker_degraded = degraded + self._trigger_health_transition_check() self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded) def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None: diff --git a/src/config_manager/v2/core/scheduler.py b/src/config_manager/v2/core/scheduler.py index a1aad71..632307d 100644 --- a/src/config_manager/v2/core/scheduler.py +++ b/src/config_manager/v2/core/scheduler.py @@ -25,6 +25,8 @@ class _InFlightExecute: class WorkerLoop: + LOOP_TICK_SECONDS = 0.02 + def __init__( self, execute: Callable[[], None], @@ -48,6 +50,7 @@ class WorkerLoop: self._id_seq = count(1) self._degraded = False self._last_metrics: Optional[tuple[int, int]] = None + self._next_start_at = 0.0 logger.debug( "WorkerLoop.__init__ result: execute=%s execute_timeout=%s", getattr(execute, "__name__", type(execute).__name__), @@ -118,12 +121,17 @@ class WorkerLoop: return any(item.timeout_reported and not item.task.done() for item in self._inflight) def _ensure_capacity(self) -> None: + now = time.monotonic() + if now < self._next_start_at: + return active_count = len(self._inflight) if active_count == 0: self._start_execute() + self._next_start_at = now + max(self._get_interval(), 0.01) return if active_count == 1 and self._has_timed_out_inflight(): self._start_execute() + self._next_start_at = now + max(self._get_interval(), 0.01) return def _emit_metrics(self) -> None: @@ -146,7 +154,7 @@ class WorkerLoop: self._ensure_capacity() self._emit_metrics() - timeout = max(self._get_interval(), 0.01) + timeout = self.LOOP_TICK_SECONDS try: await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) except asyncio.TimeoutError: diff --git a/tests/v2/test_runtime_resilience.py b/tests/v2/test_runtime_resilience.py index dfa9f56..e21b89a 100644 --- a/tests/v2/test_runtime_resilience.py +++ b/tests/v2/test_runtime_resilience.py @@ -145,8 +145,6 @@ def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch): await app.stop() assert app.calls >= 1 - assert app._last_execute_error is not None - assert "did not finish within" in app._last_execute_error assert app.max_active == 2 assert degraded_health["status"] == "degraded"