3 Commits

11 changed files with 294 additions and 159 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__ __pycache__
venv/ venv/
.vscode/ .vscode/
log*.log

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "1.0.4" version = "1.1.0"
description = "Config manager for building applications" description = "Config manager for building applications"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }

View File

@@ -1 +1 @@
from config_manager.config_manager import ConfigManager from config_manager import ConfigManager

View File

@@ -0,0 +1,2 @@
from .config_manager import ConfigManager
from .log_manager import LogManager

View File

@@ -1,20 +1,23 @@
import logging
import logging.config
import asyncio import asyncio
import json import json
import yaml import yaml
import logging
import logging.config
from typing import Any, Optional
import os import os
from typing import Any, Optional
<<<<<<<< HEAD:src/config_manager.py
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
========
from .log_manager import LogManager
>>>>>>>> develop:src/basic_application/config_manager.py
class ConfigManager: class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0 DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0 DEFAULT_WORK_INTERVAL = 2.0
def __init__(self, path: str): def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path self.path = path
self.config: Any = None self.config: Any = None
self._last_hash = None self._last_hash = None
@@ -23,6 +26,9 @@ class ConfigManager:
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str: def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f: with open(self.path, "r", encoding="utf-8") as f:
@@ -31,9 +37,13 @@ class ConfigManager:
async def _read_file_async(self) -> str: async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync) return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, str) -> Any: <<<<<<<< HEAD:src/config_manager.py
ext = os.path.splitext(self.path)[1].lower() def _parse_config(self, data) -> Any:
if ext in (".yaml", ".yml"): ========
def _parse_config(self, data) -> Any:
>>>>>>>> develop:src/basic_application/config_manager.py
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data) return yaml.safe_load(data)
else: else:
return json.loads(data) return json.loads(data)
@@ -46,25 +56,16 @@ class ConfigManager:
if isinstance(upd, (int, float)) and upd > 0: if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd) self.update_interval = float(upd)
logger.info(f"Update interval set to {self.update_interval} seconds") self.logger.info(f"Update interval set to {self.update_interval} seconds")
else: else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0: if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk) self.work_interval = float(wrk)
logger.info(f"Work interval set to {self.work_interval} seconds") self.logger.info(f"Work interval set to {self.work_interval} seconds")
else: else:
self.work_interval = self.DEFAULT_WORK_INTERVAL self.work_interval = self.DEFAULT_WORK_INTERVAL
def _apply_logging_config(self, config: dict) -> None:
try:
logging_config = config.get("logging")
if logging_config:
logging.config.dictConfig(logging_config)
logger.info("Logging configuration applied")
except Exception as e:
logger.error(f"Error applying logging config: {e}")
async def _update_config(self) -> None: async def _update_config(self) -> None:
try: try:
data = await self._read_file_async() data = await self._read_file_async()
@@ -74,12 +75,11 @@ class ConfigManager:
self.config = new_config self.config = new_config
self._last_hash = current_hash self._last_hash = current_hash
self._apply_logging_config(new_config) self._log_manager.apply_config(new_config)
self._update_intervals_from_config() self._update_intervals_from_config()
logger.info("Config updated: %s", self.config)
except Exception as e: except Exception as e:
logger.error(f"Error reading/parsing config file: {e}") self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None: def execute(self) -> None:
""" """
@@ -102,68 +102,49 @@ class ConfigManager:
async def _run(self) -> None: async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы""" """Внутренняя корутина, запускающая все циклы"""
self._halt.clear() self._halt.clear()
logger.info("ConfigManager started") self.logger.info("ConfigManager started")
try: try:
await asyncio.gather( await asyncio.gather(
self._worker_loop(), self._worker_loop(),
self._periodic_update_loop() self._periodic_update_loop()
) )
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("ConfigManager tasks cancelled") self.logger.info("ConfigManager tasks cancelled")
finally: finally:
logger.info("ConfigManager stopped") self.logger.info("ConfigManager stopped")
def start(self) -> None: def start(self) -> None:
"""Запускает менеджер конфигурации в текущем event loop""" """Запускает менеджер конфигурации в текущем event loop"""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
logger.warning("ConfigManager is already running") self.logger.warning("ConfigManager is already running")
return return
try: try:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
logger.error("start() must be called from within an async context") self.logger.error("start() must be called from within an async context")
raise raise
self._task = self._loop.create_task(self._run()) self._task = self._loop.create_task(self._run())
logger.info("ConfigManager task created") self.logger.info("ConfigManager task created")
async def stop(self) -> None: async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения""" """Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None: if self._task is None:
logger.warning("ConfigManager is not running") self.logger.warning("ConfigManager is not running")
return return
logger.info("ConfigManager stopping...") self.logger.info("ConfigManager stopping...")
self._halt.set() self._halt.set()
# Ждём корректного завершения задачи
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self._task = None self._task = None
<<<<<<<< HEAD:src/config_manager.py
logger.info("ConfigManager stopped successfully") logger.info("ConfigManager stopped successfully")
========
self.logger.info("ConfigManager stopped successfully")
# Пример наследования и переопределения execute >>>>>>>> develop:src/basic_application/config_manager.py
class MyApp(ConfigManager):
def execute(self) -> None:
logger.info("Executing blocking work with config: %s", self.config)
async def main():
app = MyApp("config.yaml")
app.start()
await asyncio.sleep(20)
await app.stop()
logger.info("Work finished.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
asyncio.run(main())

View File

@@ -0,0 +1,40 @@
import logging
from typing import Optional
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")

View File

@@ -1,5 +1,5 @@
# === Раздел с общими конфигурационными параметрами === # === Раздел с общими конфигурационными параметрами ===
runtime: 5 param: 5
# === Логирование === # === Логирование ===
log: log:
@@ -8,7 +8,7 @@ log:
formatters: formatters:
standard: standard:
format: '%(asctime)s %(name)30s [%(levelname)8s]: %(message)s' format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
telegram: telegram:
format: '%(message)s' format: '%(message)s'
@@ -40,12 +40,12 @@ log:
loggers: loggers:
'': '':
handlers: [console, file] handlers: [console, file]
level: ERROR level: INFO
propagate: False propagate: False
__main__: __main__:
handlers: [console, file] handlers: [console, file]
level: WARNING level: DEBUG
propagate: False propagate: False
config_manager: config_manager:

150
src/config_manager.py Normal file
View File

@@ -0,0 +1,150 @@
import logging
import logging.config
import asyncio
import json
import yaml
import os
from typing import Any, Optional
<<<<<<<< HEAD:src/config_manager.py
logger = logging.getLogger(__name__)
========
from .log_manager import LogManager
>>>>>>>> develop:src/basic_application/config_manager.py
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path
self.config: Any = None
self._last_hash = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f:
return f.read()
async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync)
<<<<<<<< HEAD:src/config_manager.py
def _parse_config(self, data) -> Any:
========
def _parse_config(self, data) -> Any:
>>>>>>>> develop:src/basic_application/config_manager.py
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
else:
return json.loads(data)
def _update_intervals_from_config(self) -> None:
if not self.config:
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
self.logger.info(f"Update interval set to {self.update_interval} seconds")
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
self.logger.info(f"Work interval set to {self.work_interval} seconds")
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
try:
data = await self._read_file_async()
current_hash = hash(data)
if current_hash != self._last_hash:
new_config = self._parse_config(data)
self.config = new_config
self._last_hash = current_hash
self._log_manager.apply_config(new_config)
self._update_intervals_from_config()
except Exception as e:
self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None:
"""
Метод для переопределения в подклассах.
Здесь может быть блокирующая работа.
Запускается в отдельном потоке.
"""
pass
async def _worker_loop(self) -> None:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
async def _periodic_update_loop(self) -> None:
while not self._halt.is_set():
await self._update_config()
await asyncio.sleep(self.update_interval)
async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы"""
self._halt.clear()
self.logger.info("ConfigManager started")
try:
await asyncio.gather(
self._worker_loop(),
self._periodic_update_loop()
)
except asyncio.CancelledError:
self.logger.info("ConfigManager tasks cancelled")
finally:
self.logger.info("ConfigManager stopped")
def start(self) -> None:
"""Запускает менеджер конфигурации в текущем event loop"""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = self._loop.create_task(self._run())
self.logger.info("ConfigManager task created")
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:
self.logger.warning("ConfigManager is not running")
return
self.logger.info("ConfigManager stopping...")
self._halt.set()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
<<<<<<<< HEAD:src/config_manager.py
logger.info("ConfigManager stopped successfully")
========
self.logger.info("ConfigManager stopped successfully")
>>>>>>>> develop:src/basic_application/config_manager.py

View File

@@ -1,100 +0,0 @@
# === Раздел с общими конфигурационными параметрами ===
runtime:
symbols: ["BTC_USDT", "ETH_USDT", "USDD_USDT", "TRX_USDT", "BTT_USDT", "NFT_USDT", "XRP_USDT",
"ETH_BTC", "XRP_BTC", "TRX_BTC", "LTC_BTC", "EOS_BTC", "XMR_BTC", "DOGE_BTC",
"NFT_TRX", "ETH_TRX", "JST_TRX", "XRP_TRX",
"ETHBULL_USDT", "BULL_USDT", "BEAR_USDT", "ADABULL_USDT"]
updateTimeout: 45
errorTimeout: 10
orderbook:
levels: [ 0.0, 0.2, 0.4, 0.6, 0.8,
1.0, 1.2, 1.4, 1.6, 1.8,
2.0, 2.2, 2.4, 2.6, 2.8,
3.0, 3.3, 3.6, 3.9,
4.2, 4.5, 4.8,
5.1, 5.4, 5.7, 100 ]
trades:
depth: 300
# === Database params ===
db:
#host: 185.117.118.107
host: 92.53.127.143
port: 59000
database: rt5_dev
# === Логирование ===
log:
version: 1
disable_existing_loggers: False
formatters:
standard:
format: '%(asctime)s %(name)30s [%(levelname)8s]: %(message)s'
telegram:
format: '%(message)s'
handlers:
console:
level: DEBUG
formatter: standard
class: logging.StreamHandler
stream: ext://sys.stdout # Default is stderr
file:
level: DEBUG
formatter: standard
class: logging.handlers.RotatingFileHandler
filename: logs/log.log
mode: a
maxBytes: 500000
backupCount: 15
telegram:
level: CRITICAL
formatter: telegram
class: logging_telegram_handler.TelegramHandler
chat_id: 211945135
alias: "PDC"
# -- Логгеры --
loggers:
'':
handlers: [console, file]
level: ERROR
propagate: False
__main__:
handlers: [console, file, telegram]
level: WARNING
propagate: False
basic_application:
handlers: [console, file, telegram]
level: INFO
config_manager:
level: INFO
log_manager:
level: INFO
poloniex.public:
level: ERROR
controllers.abstract:
level: ERROR
controllers.trades:
level: ERROR
controllers.orderbook:
level: ERROR
clickhouse_connector.clickhouse_connector:
level: ERROR

26
src/main_example.py Normal file
View File

@@ -0,0 +1,26 @@
import os
os.chdir(os.path.dirname(__file__))
import logging
import asyncio
from config_manager.config_manager import ConfigManager
logger = logging.getLogger()
# Пример наследования и переопределения execute
class MyApp(ConfigManager):
def execute(self) -> None:
logger.info("Executing blocking work with config: %s", self.config)
async def main():
app = MyApp("config.yaml")
app.start()
await asyncio.sleep(20)
await app.stop()
logger.info("Work finished.")
if __name__ == "__main__":
asyncio.run(main())

35
src/test.py Normal file
View File

@@ -0,0 +1,35 @@
from basic_application import ConfigManager
import logging
import asyncio
from typing import Optional
import os
os.chdir(os.path.dirname(__file__))
logger = logging.getLogger()
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
self.iter += 1
async def main():
app = MyApp("config.yaml")
app.start()
logger.info("App started")
await asyncio.sleep(20)
await app.stop()
if __name__ == "__main__":
asyncio.run(main())