2026-03-03 10:41:39 +03:00
2026-03-03 10:41:39 +03:00
2025-11-01 08:31:57 +03:00
2025-10-11 22:16:58 +03:00
2026-03-03 10:41:39 +03:00
2026-03-03 10:41:39 +03:00
2025-10-11 22:16:58 +03:00

Config Manager

Описание

Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики.

Под сервисным контуром здесь понимаются:

  • логирование;
  • трассировка бизнес-процессов и связанных сущностей;
  • управление приложением через каналы управления (например, HTTP API).

Контракт: приложение наследует ConfigManagerV2, переопределяет execute() (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в control_channels (в т.ч. HttpControlChannel для API).

ConfigManager: устройство и взаимосвязи

ConfigManager (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от _RuntimeController (циклы воркера и обновления конфига, запуск/остановка каналов управления).

Ядро (core):

  • ConfigLoader — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
  • WorkerLoop — в отдельном потоке циклически вызывает ваш метод execute() с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
  • LogManager — применяет секцию log из конфига к логированию (dictConfig).
  • TraceManager — управляет структурированной трассировкой процессов, контекстов и сообщений.
  • HealthAggregator — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного execute() и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
  • ControlChannelBridge — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).

Каналы управления (control):

  • ControlChannel — абстрактный контракт: start(on_start, on_stop, on_status), stop().
  • HttpControlChannel — HTTP API (/health, /actions/start, /actions/stop, /actions/status); использует UvicornServerRunner; для /health вызывает HealthAggregator.collect(), для действий — переданные обработчики из ControlChannelBridge.
  • TelegramControlChannel — реализация через long polling Telegram; команды /start, /stop, /status вызывают переданные обработчики.

Поток работы: при start() менеджер поднимает каналы из control_channels (заданные снаружи), затем запускает два цикла: WorkerLoop и периодическое обновление конфига через ConfigLoader. Управление по API: /health, /actions/start, /actions/stop — если в control_channels передан HttpControlChannel. Остановка по halt завершает оба цикла; в конце останавливаются все каналы.

Запуск приложения с ConfigManagerV2 и HttpControlChannel

  1. Наследуйте ConfigManagerV2 и реализуйте метод execute() (в нём — ваша периодическая работа). При необходимости переопределите get_health_status() для кастомного ответа /health.

  2. Создайте каналы снаружи и передайте в конструктор. Для HTTP API создайте HttpControlChannel; для health нужен колбэк менеджера — передайте control_channels как фабрику (lambda, получающую менеджер):

    from config_manager.v2.control import HttpControlChannel
    
    app = MyApp(
        str(path_to_config),
        control_channels=lambda m: [
            HttpControlChannel(
                host="0.0.0.0",
                port=8000,
                timeout=3,
                health_provider=m.get_health_provider(),
            )
        ],
    )
    

    Либо передайте готовый список каналов: control_channels=[channel1, channel2].

  3. Запустите из async-контекста: await app.start() или asyncio.create_task(app.start()) для фона. Остановка: await app.stop() или запрос /actions/stop по HTTP.

Минимальный пример с HTTP API:

import asyncio
import logging
from pathlib import Path

from config_manager import ConfigManager
from config_manager.v2.control import HttpControlChannel

class MyApp(ConfigManager):
    def execute(self) -> None:
        pass  # ваша периодическая работа

async def main() -> None:
    app = MyApp(
        str(Path(__file__).parent / "config.yaml"),
        control_channels=lambda m: [
            HttpControlChannel(
                host="0.0.0.0", port=8000, timeout=3,
                health_provider=m.get_health_provider(),
            )
        ],
    )
    asyncio.create_task(app.start())
    await asyncio.sleep(3600)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

Готовый пример: tests/test_app.py.

Диаграмма классов

classDiagram
    direction TB

    class ConfigManagerV2 {
        +str path
        +Any config
        +float update_interval
        +float work_interval
        -ConfigLoader _loader
        -LifecycleState _state
        +start() async
        +stop() async
        +execute()*
        +get_health_status() HealthPayload
        -_run() async
        -_worker_loop() async
        -_periodic_update_loop() async
    }

    class _RuntimeController {
        <<внутренний>>
        -_on_execute_success()
        -_on_execute_error(exc)
        -_worker_loop() async
        -_periodic_update_loop() async
        -_start_control_channels() async
        -_stop_control_channels() async
        -_run() async
    }

    class ConfigLoader {
        +str path
        +Any config
        +Any last_valid_config
        +load_if_changed() async
        +parse_config(data) Any
        -_read_file_sync() str
        -read_file_async() async
    }

    class WorkerLoop {
        -Callable execute
        -Callable get_interval
        -Event halt_event
        +run() async
    }

    class LogManager {
        +apply_config(config) None
    }

    class TraceManager {
        +bind_context(alias, parent_id, type, attrs) str
        +open_context(alias, parent_id, type, attrs) contextmanager
        +current_trace_id() str
        +step(name) None
        +info(message, status, attrs) None
        +warning(message, status, attrs) None
        +error(message, status, attrs) None
    }

    class TraceContextStore {
        +current() ActiveTraceContext
        +current_trace_id() str
        +push(record) ActiveTraceContext
        +pop() ActiveTraceContext
        +set_step(step) ActiveTraceContext
    }

    class TraceContextRecord {
        +str trace_id
        +str parent_id
        +str alias
        +str type
        +datetime event_time
        +dict attrs
    }

    class TraceLogMessage {
        +str trace_id
        +str step
        +str status
        +str level
        +str message
        +datetime event_time
        +dict attrs
    }

    class TraceTransport {
        <<protocol>>
        +write_context(record) None
        +write_message(record) None
    }

    class MySqlTraceTransport {
        +write_context(record) None
        +write_message(record) None
    }

    class ControlChannel {
        <<абстрактный>>
        +start(on_start, on_stop, on_status) async*
        +stop() async*
    }

    class TelegramControlChannel {
        -str _token
        -int _chat_id
        +start(on_start, on_stop, on_status) async
        +stop() async
        -_poll_loop() async
    }

    class HttpControlChannel {
        -UvicornServerRunner _runner
        -Callable _health_provider
        +start(on_start, on_stop, on_status) async
        +stop() async
        +int port
    }

    class HealthAggregator {
        -Callable get_state
        -Callable get_app_health
        +collect() async HealthPayload
    }

    class ControlChannelBridge {
        -Event _halt
        -Callable _get_state
        -Callable _get_status
        +on_start() async str
        +on_stop() async str
        +on_status() async str
    }

    class UvicornServerRunner {
        -Server _server
        -Task _serve_task
        +start(app) async
        +stop() async
        +int port
    }

    ConfigManagerV2 --|> _RuntimeController : наследует
    ConfigManagerV2 --> ConfigLoader : использует
    ConfigManagerV2 --> LogManager : использует
    ConfigManagerV2 --> TraceManager : использует
    ConfigManagerV2 --> HealthAggregator : использует
    ConfigManagerV2 --> ControlChannelBridge : использует
    ConfigManagerV2 ..> ControlChannel : список каналов
    _RuntimeController ..> WorkerLoop : создаёт в _worker_loop
    TraceManager --> TraceContextStore : использует
    TraceManager --> TraceTransport : использует
    TraceManager ..> TraceContextRecord : создаёт
    TraceManager ..> TraceLogMessage : создаёт
    MySqlTraceTransport --|> TraceTransport : реализует
    TelegramControlChannel --|> ControlChannel : реализует
    HttpControlChannel --|> ControlChannel : реализует
    HttpControlChannel --> UvicornServerRunner : использует
    HttpControlChannel ..> HealthAggregator : health_provider
    ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status

Диаграмма последовательности (запуск и работа)

sequenceDiagram
    autonumber
    participant User
    participant ConfigManagerV2
    participant ControlChannel as ControlChannel(s)
    participant WorkerLoop
    participant ConfigLoader
    participant Client as HTTP Client

    User->>ConfigManagerV2: start()
    ConfigManagerV2->>ConfigManagerV2: _start_control_channels()
    ConfigManagerV2->>ControlChannel: start(on_start, on_stop, on_status)
    ControlChannel-->>ConfigManagerV2: started

    par Циклы работы
        ConfigManagerV2->>WorkerLoop: run()
        loop Периодически
            WorkerLoop->>ConfigManagerV2: execute()
            ConfigManagerV2-->>WorkerLoop: success/error
        end
    and
        loop Периодически
            ConfigManagerV2->>ConfigLoader: load_if_changed()
            ConfigLoader-->>ConfigManagerV2: config / unchanged
        end
    end

    Note over Client,ControlChannel: Запросы по HTTP API (если HttpControlChannel)

    Client->>ControlChannel: GET /health
    ControlChannel->>ConfigManagerV2: health_provider.collect()
    ConfigManagerV2-->>ControlChannel: HealthPayload
    ControlChannel-->>Client: 200 OK

    Client->>ControlChannel: POST /actions/stop
    ControlChannel->>ConfigManagerV2: on_stop() (bridge)
    ConfigManagerV2->>ConfigManagerV2: set halt
    ConfigManagerV2->>WorkerLoop: halt_event.set()
    ConfigManagerV2->>ControlChannel: stop()
    ControlChannel-->>Client: 200 OK

Логирование

Логирование настраивается из конфигурационного файла только если в нём есть секция log в формате dictConfig. Если секции log нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.

Как проверить, что конфигурация логирования применилась:

  • Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
  • Убедитесь, что в конфиге есть ключ log с version: 1, handlers и loggers (пример — tests/config.yaml).
  • После старта в логе должно появиться сообщение уровня INFO: "Logging configuration applied" (из config_manager.v2.core.log_manager). Если его нет, либо секция log отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.

Trace

Модуль trace предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей.

Базовая идея:

  • есть TraceContextRecord — логический контекст, который группирует сообщения;
  • есть TraceLogMessage — отдельное событие внутри текущего контекста;
  • контексты могут быть вложенными: один родительский контекст и много дочерних;
  • активный контекст хранится в TraceContextStore, а при выходе из дочернего with автоматически восстанавливается родитель.

Архитектура

Основные части модуля:

  • TraceManager — публичный API для приложений;
  • TraceContextStore — хранение активного контекста и стека вложенности;
  • TraceContextRecord — описание контекста;
  • TraceLogMessage — описание сообщения;
  • TraceTransport — интерфейс транспорта;
  • MySqlTraceTransport — запись контекстов и сообщений в MySQL.

Сущности:

TraceContextRecord

  • trace_id
  • parent_id
  • alias
  • type
  • event_time
  • attrs

TraceLogMessage

  • trace_id
  • event_time
  • step
  • status
  • level
  • message
  • attrs

Принцип использования

  1. На старте процесса создаётся контекст через bind_context() или open_context().
  2. Для серии сообщений выставляется текущий step().
  3. Сообщения пишутся через info()/warning()/error()/exception().
  4. При использовании open_context() дочерний контекст автоматически закрывается по выходу из with, а родительский становится текущим снова.

Пример: корневой контекст

from config_manager.v2.trace import TraceManager

trace = TraceManager()

trace_id = trace.bind_context(
    alias="job-123",
    type="task",
    attrs={"source": "scheduler"},
)

trace.step("prepare")
trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2})

Пример: дочерний контекст

with trace.open_context(
    alias="subtask-1",
    type="subtask",
    parent_id=trace_id,
    attrs={"segment": "phase-a"},
) as child_trace_id:
    trace.step("execute")
    trace.info("Подзадача запущена", status="started")
    trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120})

# Здесь снова активен родительский контекст
trace.step("finish")
trace.info("Обработка завершена", status="completed")

Хранение в MySQL

Для MySQL предусмотрен MySqlTraceTransport. Он пишет две сущности в отдельные таблицы:

  • trace_contexts
  • trace_messages

Это позволяет:

  • отдельно хранить структуру процесса;
  • отдельно хранить историю шагов и сообщений;
  • строить отчёты и трассировку без завязки на logging.

Установка

pip install git+https://git.lesha.spb.ru/alex/config_manager.git

Контакты

Description
No description provided
Readme 306 KiB
Languages
Python 100%