7 Commits

Author SHA1 Message Date
ffd758d9a4 ready for release 2025-11-01 08:31:57 +03:00
526661e498 no message 2025-10-31 23:18:55 +03:00
b0c87a427c Merge branch 'develop' 2025-10-31 23:16:35 +03:00
7b74e0b0b8 no message 2025-10-31 23:16:25 +03:00
5faea8f69f Merge branch 'develop' 2025-10-31 18:54:18 +03:00
f491c65455 Обновил структуру и отладил, все работает 2025-10-31 18:48:52 +03:00
4eb9327628 Незначительные правки 2025-10-30 13:00:53 +03:00
11 changed files with 104 additions and 293 deletions

4
.gitignore vendored
View File

@@ -1,3 +1,5 @@
__pycache__ __pycache__
venv/ .venv/
.vscode/ .vscode/
log*.log
config_manager.egg-info

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "1.0.4" version = "1.2.0"
description = "Config manager for building applications" description = "Config manager for building applications"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }

View File

@@ -1 +0,0 @@
from config_manager.config_manager import ConfigManager

View File

@@ -0,0 +1,2 @@
from .cfg_manager import ConfigManager
from .log_manager import LogManager

View File

@@ -1,20 +1,18 @@
import logging
import logging.config
import asyncio import asyncio
import json import json
import yaml import yaml
import logging
import logging.config
from typing import Any, Optional
import os import os
from typing import Any, Optional
from .log_manager import LogManager
logger = logging.getLogger(__name__)
class ConfigManager: class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0 DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0 DEFAULT_WORK_INTERVAL = 2.0
def __init__(self, path: str): def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path self.path = path
self.config: Any = None self.config: Any = None
self._last_hash = None self._last_hash = None
@@ -24,6 +22,9 @@ class ConfigManager:
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str: def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f: with open(self.path, "r", encoding="utf-8") as f:
return f.read() return f.read()
@@ -31,9 +32,9 @@ class ConfigManager:
async def _read_file_async(self) -> str: async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync) return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, str) -> Any: def _parse_config(self, data) -> Any:
ext = os.path.splitext(self.path)[1].lower() extension = os.path.splitext(self.path)[1].lower()
if ext in (".yaml", ".yml"): if extension in (".yaml", ".yml"):
return yaml.safe_load(data) return yaml.safe_load(data)
else: else:
return json.loads(data) return json.loads(data)
@@ -46,25 +47,16 @@ class ConfigManager:
if isinstance(upd, (int, float)) and upd > 0: if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd) self.update_interval = float(upd)
logger.info(f"Update interval set to {self.update_interval} seconds") self.logger.info(f"Update interval set to {self.update_interval} seconds")
else: else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0: if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk) self.work_interval = float(wrk)
logger.info(f"Work interval set to {self.work_interval} seconds") self.logger.info(f"Work interval set to {self.work_interval} seconds")
else: else:
self.work_interval = self.DEFAULT_WORK_INTERVAL self.work_interval = self.DEFAULT_WORK_INTERVAL
def _apply_logging_config(self, config: dict) -> None:
try:
logging_config = config.get("logging")
if logging_config:
logging.config.dictConfig(logging_config)
logger.info("Logging configuration applied")
except Exception as e:
logger.error(f"Error applying logging config: {e}")
async def _update_config(self) -> None: async def _update_config(self) -> None:
try: try:
data = await self._read_file_async() data = await self._read_file_async()
@@ -74,12 +66,11 @@ class ConfigManager:
self.config = new_config self.config = new_config
self._last_hash = current_hash self._last_hash = current_hash
self._apply_logging_config(new_config) self._log_manager.apply_config(new_config)
self._update_intervals_from_config() self._update_intervals_from_config()
logger.info("Config updated: %s", self.config)
except Exception as e: except Exception as e:
logger.error(f"Error reading/parsing config file: {e}") self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None: def execute(self) -> None:
""" """
@@ -102,68 +93,45 @@ class ConfigManager:
async def _run(self) -> None: async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы""" """Внутренняя корутина, запускающая все циклы"""
self._halt.clear() self._halt.clear()
logger.info("ConfigManager started") self.logger.info("ConfigManager started")
try: try:
await asyncio.gather( await asyncio.gather(
self._worker_loop(), self._worker_loop(),
self._periodic_update_loop() self._periodic_update_loop()
) )
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("ConfigManager tasks cancelled") self.logger.info("ConfigManager tasks cancelled")
finally: finally:
logger.info("ConfigManager stopped") self.logger.info("ConfigManager stopped")
def start(self) -> None: def start(self) -> None:
"""Запускает менеджер конфигурации в текущем event loop""" """Запускает менеджер конфигурации в текущем event loop"""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
logger.warning("ConfigManager is already running") self.logger.warning("ConfigManager is already running")
return return
try: try:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
logger.error("start() must be called from within an async context") self.logger.error("start() must be called from within an async context")
raise raise
self._task = self._loop.create_task(self._run()) self._task = self._loop.create_task(self._run())
logger.info("ConfigManager task created") self.logger.info("ConfigManager task created")
async def stop(self) -> None: async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения""" """Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None: if self._task is None:
logger.warning("ConfigManager is not running") self.logger.warning("ConfigManager is not running")
return return
logger.info("ConfigManager stopping...") self.logger.info("ConfigManager stopping...")
self._halt.set() self._halt.set()
# Ждём корректного завершения задачи
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self._task = None self._task = None
logger.info("ConfigManager stopped successfully") self.logger.info("ConfigManager stopped successfully")
# Пример наследования и переопределения execute
class MyApp(ConfigManager):
def execute(self) -> None:
logger.info("Executing blocking work with config: %s", self.config)
async def main():
app = MyApp("config.yaml")
app.start()
await asyncio.sleep(20)
await app.stop()
logger.info("Work finished.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
asyncio.run(main())

View File

@@ -0,0 +1,40 @@
import logging
from typing import Optional
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")

View File

@@ -1,100 +0,0 @@
# === Раздел с общими конфигурационными параметрами ===
runtime:
symbols: ["BTC_USDT", "ETH_USDT", "USDD_USDT", "TRX_USDT", "BTT_USDT", "NFT_USDT", "XRP_USDT",
"ETH_BTC", "XRP_BTC", "TRX_BTC", "LTC_BTC", "EOS_BTC", "XMR_BTC", "DOGE_BTC",
"NFT_TRX", "ETH_TRX", "JST_TRX", "XRP_TRX",
"ETHBULL_USDT", "BULL_USDT", "BEAR_USDT", "ADABULL_USDT"]
updateTimeout: 45
errorTimeout: 10
orderbook:
levels: [ 0.0, 0.2, 0.4, 0.6, 0.8,
1.0, 1.2, 1.4, 1.6, 1.8,
2.0, 2.2, 2.4, 2.6, 2.8,
3.0, 3.3, 3.6, 3.9,
4.2, 4.5, 4.8,
5.1, 5.4, 5.7, 100 ]
trades:
depth: 300
# === Database params ===
db:
#host: 185.117.118.107
host: 92.53.127.143
port: 59000
database: rt5_dev
# === Логирование ===
log:
version: 1
disable_existing_loggers: False
formatters:
standard:
format: '%(asctime)s %(name)30s [%(levelname)8s]: %(message)s'
telegram:
format: '%(message)s'
handlers:
console:
level: DEBUG
formatter: standard
class: logging.StreamHandler
stream: ext://sys.stdout # Default is stderr
file:
level: DEBUG
formatter: standard
class: logging.handlers.RotatingFileHandler
filename: logs/log.log
mode: a
maxBytes: 500000
backupCount: 15
telegram:
level: CRITICAL
formatter: telegram
class: logging_telegram_handler.TelegramHandler
chat_id: 211945135
alias: "PDC"
# -- Логгеры --
loggers:
'':
handlers: [console, file]
level: ERROR
propagate: False
__main__:
handlers: [console, file, telegram]
level: WARNING
propagate: False
basic_application:
handlers: [console, file, telegram]
level: INFO
config_manager:
level: INFO
log_manager:
level: INFO
poloniex.public:
level: ERROR
controllers.abstract:
level: ERROR
controllers.trades:
level: ERROR
controllers.orderbook:
level: ERROR
clickhouse_connector.clickhouse_connector:
level: ERROR

View File

@@ -8,7 +8,7 @@ log:
formatters: formatters:
standard: standard:
format: '%(asctime)s %(name)30s [%(levelname)8s]: %(message)s' format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
telegram: telegram:
format: '%(message)s' format: '%(message)s'
@@ -40,12 +40,12 @@ log:
loggers: loggers:
'': '':
handlers: [console, file] handlers: [console, file]
level: ERROR level: INFO
propagate: False propagate: False
__main__: __main__:
handlers: [console, file] handlers: [console, file]
level: WARNING level: DEBUG
propagate: False propagate: False
config_manager: config_manager:

View File

@@ -1,131 +0,0 @@
import unittest
from unittest.mock import patch, mock_open, AsyncMock
import asyncio
import logging
import io
import json
import yaml
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from basic_application.basic_application import ConfigManager
class TestConfigManager(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.json_data = json.dumps({
"work_interval": 1,
"update_interval": 1,
"logging": {
"version": 1,
"handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}},
"root": {"handlers": ["console"], "level": "DEBUG"}
},
"some_key": "some_value"
})
self.yaml_data = """
work_interval: 1
update_interval: 1
logging:
version: 1
handlers:
console:
class: logging.StreamHandler
level: DEBUG
root:
handlers: [console]
level: DEBUG
some_key: some_value
"""
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_json(self, mock_file):
mock_file.return_value.read = lambda: self.json_data
cm = ConfigManager("config.json")
content = await cm._read_file_async()
self.assertEqual(content, self.json_data)
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_yaml(self, mock_file):
mock_file.return_value.read = lambda: self.yaml_data
cm = ConfigManager("config.yaml")
content = await cm._read_file_async()
self.assertEqual(content, self.yaml_data)
def test_parse_json(self):
cm = ConfigManager("config.json")
parsed = cm._parse_config(self.json_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
def test_parse_yaml(self):
cm = ConfigManager("config.yaml")
parsed = cm._parse_config(self.yaml_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
@patch("basic_application.basic_application.logging.config.dictConfig")
def test_apply_logging_config(self, mock_dict_config):
cm = ConfigManager("config.json")
cm._apply_logging_config({"logging": {"version": 1}})
mock_dict_config.assert_called_once()
async def test_update_config_changes_config_and_intervals(self):
# Мокаем чтение файла
m = mock_open(read_data=self.json_data)
with patch("builtins.open", m):
cm = ConfigManager("config.json")
# Проверяем исходные интервалы
self.assertEqual(cm.update_interval, cm.DEFAULT_UPDATE_INTERVAL)
self.assertEqual(cm.work_interval, cm.DEFAULT_WORK_INTERVAL)
await cm._update_config()
# После обновления данные заполнены
self.assertIsInstance(cm.config, dict)
self.assertEqual(cm.update_interval, 1.0)
self.assertEqual(cm.work_interval, 1.0)
async def test_execute_called_in_worker_loop(self):
called = False
class TestCM(ConfigManager):
def execute(self2):
nonlocal called
called = True
cm = TestCM("config.json")
async def stop_after_delay():
await asyncio.sleep(0.1)
cm.stop()
# Запускаем worker_loop и через 0.1 сек останавливаем
await asyncio.gather(cm._worker_loop(), stop_after_delay())
self.assertTrue(called)
async def test_periodic_update_loop_runs(self):
count = 0
class TestCM(ConfigManager):
async def _update_config(self2):
nonlocal count
count += 1
if count >= 2:
self2.stop()
cm = TestCM("config.json")
await cm._periodic_update_loop()
self.assertGreaterEqual(count, 2)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) # отключаем логи во время тестов
unittest.main()

31
tests/test_app.py Normal file
View File

@@ -0,0 +1,31 @@
#import os
#os.chdir(os.path.dirname(__file__))
from config_manager import ConfigManager
import logging
import asyncio
from typing import Optional
logger = logging.getLogger()
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
self.iter += 1
async def main():
app = MyApp("config.yaml")
app.start()
logger.info("App started")
await asyncio.sleep(20)
await app.stop()
if __name__ == "__main__":
asyncio.run(main())

View File