8 Commits

Author SHA1 Message Date
8f22fcf6af method start no now blocks main cycle 2025-11-01 21:35:25 +03:00
311870fd73 Changed the algorithm of the start method 2025-11-01 21:00:14 +03:00
ffd758d9a4 ready for release 2025-11-01 08:31:57 +03:00
526661e498 no message 2025-10-31 23:18:55 +03:00
b0c87a427c Merge branch 'develop' 2025-10-31 23:16:35 +03:00
7b74e0b0b8 no message 2025-10-31 23:16:25 +03:00
5faea8f69f Merge branch 'develop' 2025-10-31 18:54:18 +03:00
4eb9327628 Незначительные правки 2025-10-30 13:00:53 +03:00
10 changed files with 23 additions and 154 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
__pycache__ __pycache__
venv/ .venv/
.vscode/ .vscode/
log*.log log*.log
config_manager.egg-info

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "1.1.0" version = "1.2.2"
description = "Config manager for building applications" description = "Config manager for building applications"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }

View File

@@ -1,2 +0,0 @@
from .config_manager import ConfigManager
from .log_manager import LogManager

View File

@@ -0,0 +1,2 @@
from .cfg_manager import ConfigManager
from .log_manager import LogManager

View File

@@ -104,8 +104,7 @@ class ConfigManager:
finally: finally:
self.logger.info("ConfigManager stopped") self.logger.info("ConfigManager stopped")
def start(self) -> None: async def start(self) -> None:
"""Запускает менеджер конфигурации в текущем event loop"""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running") self.logger.warning("ConfigManager is already running")
return return
@@ -116,8 +115,9 @@ class ConfigManager:
self.logger.error("start() must be called from within an async context") self.logger.error("start() must be called from within an async context")
raise raise
self._task = self._loop.create_task(self._run()) self.logger.info("ConfigManager starting and awaiting _run()")
self.logger.info("ConfigManager task created") await self._run()
async def stop(self) -> None: async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения""" """Останавливает менеджер конфигурации и ожидает завершения"""

View File

@@ -1,131 +0,0 @@
import unittest
from unittest.mock import patch, mock_open, AsyncMock
import asyncio
import logging
import io
import json
import yaml
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from basic_application.basic_application import ConfigManager
class TestConfigManager(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.json_data = json.dumps({
"work_interval": 1,
"update_interval": 1,
"logging": {
"version": 1,
"handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}},
"root": {"handlers": ["console"], "level": "DEBUG"}
},
"some_key": "some_value"
})
self.yaml_data = """
work_interval: 1
update_interval: 1
logging:
version: 1
handlers:
console:
class: logging.StreamHandler
level: DEBUG
root:
handlers: [console]
level: DEBUG
some_key: some_value
"""
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_json(self, mock_file):
mock_file.return_value.read = lambda: self.json_data
cm = ConfigManager("config.json")
content = await cm._read_file_async()
self.assertEqual(content, self.json_data)
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_yaml(self, mock_file):
mock_file.return_value.read = lambda: self.yaml_data
cm = ConfigManager("config.yaml")
content = await cm._read_file_async()
self.assertEqual(content, self.yaml_data)
def test_parse_json(self):
cm = ConfigManager("config.json")
parsed = cm._parse_config(self.json_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
def test_parse_yaml(self):
cm = ConfigManager("config.yaml")
parsed = cm._parse_config(self.yaml_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
@patch("basic_application.basic_application.logging.config.dictConfig")
def test_apply_logging_config(self, mock_dict_config):
cm = ConfigManager("config.json")
cm._apply_logging_config({"logging": {"version": 1}})
mock_dict_config.assert_called_once()
async def test_update_config_changes_config_and_intervals(self):
# Мокаем чтение файла
m = mock_open(read_data=self.json_data)
with patch("builtins.open", m):
cm = ConfigManager("config.json")
# Проверяем исходные интервалы
self.assertEqual(cm.update_interval, cm.DEFAULT_UPDATE_INTERVAL)
self.assertEqual(cm.work_interval, cm.DEFAULT_WORK_INTERVAL)
await cm._update_config()
# После обновления данные заполнены
self.assertIsInstance(cm.config, dict)
self.assertEqual(cm.update_interval, 1.0)
self.assertEqual(cm.work_interval, 1.0)
async def test_execute_called_in_worker_loop(self):
called = False
class TestCM(ConfigManager):
def execute(self2):
nonlocal called
called = True
cm = TestCM("config.json")
async def stop_after_delay():
await asyncio.sleep(0.1)
cm.stop()
# Запускаем worker_loop и через 0.1 сек останавливаем
await asyncio.gather(cm._worker_loop(), stop_after_delay())
self.assertTrue(called)
async def test_periodic_update_loop_runs(self):
count = 0
class TestCM(ConfigManager):
async def _update_config(self2):
nonlocal count
count += 1
if count >= 2:
self2.stop()
cm = TestCM("config.json")
await cm._periodic_update_loop()
self.assertGreaterEqual(count, 2)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) # отключаем логи во время тестов
unittest.main()

View File

@@ -1,35 +1,34 @@
from basic_application import ConfigManager #import os
#os.chdir(os.path.dirname(__file__))
from config_manager import ConfigManager
import logging import logging
import asyncio import asyncio
from typing import Optional from typing import Optional
import os
os.chdir(os.path.dirname(__file__))
logger = logging.getLogger() logger = logging.getLogger()
class MyApp(ConfigManager): class MyApp(ConfigManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.iter = 0 self.iter = 0
def execute(self) -> None: def execute(self) -> None:
logger.info(f"current iteration {self.iter}") logger.info(f"current iteration {self.iter}")
self.iter += 1 self.iter += 1
async def main(): async def main():
app = MyApp("config.yaml") app = MyApp("config.yaml")
app.start()
logger.info("App started") logger.info("App started")
await asyncio.sleep(20) await app.start()
await app.stop()
logger.info("App finished")
while True:
await asyncio.sleep(1)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File