import logging import logging.config import asyncio import json import yaml import os from typing import Any, Optional <<<<<<<< HEAD:src/config_manager.py logger = logging.getLogger(__name__) ======== from .log_manager import LogManager >>>>>>>> develop:src/basic_application/config_manager.py class ConfigManager: DEFAULT_UPDATE_INTERVAL = 5.0 DEFAULT_WORK_INTERVAL = 2.0 def __init__(self, path: str, log_manager: Optional[LogManager] = None): self.path = path self.config: Any = None self._last_hash = None self.update_interval = self.DEFAULT_UPDATE_INTERVAL self.work_interval = self.DEFAULT_WORK_INTERVAL self._halt = asyncio.Event() self._task: Optional[asyncio.Task] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._log_manager = log_manager or LogManager() self.logger = logging.getLogger(__name__) def _read_file_sync(self) -> str: with open(self.path, "r", encoding="utf-8") as f: return f.read() async def _read_file_async(self) -> str: return await asyncio.to_thread(self._read_file_sync) <<<<<<<< HEAD:src/config_manager.py def _parse_config(self, data) -> Any: ======== def _parse_config(self, data) -> Any: >>>>>>>> develop:src/basic_application/config_manager.py extension = os.path.splitext(self.path)[1].lower() if extension in (".yaml", ".yml"): return yaml.safe_load(data) else: return json.loads(data) def _update_intervals_from_config(self) -> None: if not self.config: return upd = self.config.get("update_interval") wrk = self.config.get("work_interval") if isinstance(upd, (int, float)) and upd > 0: self.update_interval = float(upd) self.logger.info(f"Update interval set to {self.update_interval} seconds") else: self.update_interval = self.DEFAULT_UPDATE_INTERVAL if isinstance(wrk, (int, float)) and wrk > 0: self.work_interval = float(wrk) self.logger.info(f"Work interval set to {self.work_interval} seconds") else: self.work_interval = self.DEFAULT_WORK_INTERVAL async def _update_config(self) -> None: try: data = await self._read_file_async() current_hash = hash(data) if current_hash != self._last_hash: new_config = self._parse_config(data) self.config = new_config self._last_hash = current_hash self._log_manager.apply_config(new_config) self._update_intervals_from_config() except Exception as e: self.logger.error(f"Error reading/parsing config file: {e}") def execute(self) -> None: """ Метод для переопределения в подклассах. Здесь может быть блокирующая работа. Запускается в отдельном потоке. """ pass async def _worker_loop(self) -> None: while not self._halt.is_set(): await asyncio.to_thread(self.execute) await asyncio.sleep(self.work_interval) async def _periodic_update_loop(self) -> None: while not self._halt.is_set(): await self._update_config() await asyncio.sleep(self.update_interval) async def _run(self) -> None: """Внутренняя корутина, запускающая все циклы""" self._halt.clear() self.logger.info("ConfigManager started") try: await asyncio.gather( self._worker_loop(), self._periodic_update_loop() ) except asyncio.CancelledError: self.logger.info("ConfigManager tasks cancelled") finally: self.logger.info("ConfigManager stopped") def start(self) -> None: """Запускает менеджер конфигурации в текущем event loop""" if self._task is not None and not self._task.done(): self.logger.warning("ConfigManager is already running") return try: self._loop = asyncio.get_running_loop() except RuntimeError: self.logger.error("start() must be called from within an async context") raise self._task = self._loop.create_task(self._run()) self.logger.info("ConfigManager task created") async def stop(self) -> None: """Останавливает менеджер конфигурации и ожидает завершения""" if self._task is None: self.logger.warning("ConfigManager is not running") return self.logger.info("ConfigManager stopping...") self._halt.set() try: await self._task except asyncio.CancelledError: pass self._task = None <<<<<<<< HEAD:src/config_manager.py logger.info("ConfigManager stopped successfully") ======== self.logger.info("ConfigManager stopped successfully") >>>>>>>> develop:src/basic_application/config_manager.py