PLBA

Установка

Установка пакета напрямую из Git-репозитория через pip:

export GIT_PLBA_TOKEN="<ваш_токен>"
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git"

Если нужен конкретный тег, ветка или commit, добавьте ref после .git:

pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main"

Доступ через GIT_PLBA_TOKEN

Для установки по HTTPS используется переменная окружения GIT_PLBA_TOKEN. В нее нужно положить персональный Git-токен с правом чтения репозитория alex/plba. Токен передается в URL установки и позволяет pip скачать исходники без SSH-доступа.

Рекомендуется:

  • экспортировать токен только в текущую shell-сессию;
  • не хардкодить токен в requirements.txt, pyproject.toml или исходниках;
  • использовать отдельный read-only токен, если Git-сервер это поддерживает.

1. Назначение платформы

PLBA (Platform Runtime for Business Applications) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг ApplicationModule, Worker и прикладной Routine, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании.

2. Концепция использования

ApplicationModule - точка сборки приложения: здесь создаются зависимости, собираются рутины, создаются воркеры и регистрируются дополнительные health-контрибьюторы.

Worker - основной runtime-контракт платформы: он управляет исполнением (потоки, цикл, стратегия остановки), возвращает health() и status(), а также определяет критичность компонента для общего health.

Routine - прикладной паттерн (не обязательный контракт платформы): класс или набор классов, где сосредоточена бизнес-логика, которую воркер регулярно или однократно запускает.

Вспомогательные сервисы платформы дополняют core-модель:

  • tracing (TraceService, транспорты) - операционная трассировка контекстов и сообщений.
  • logging (LogManager) - применение конфигурации логирования из runtime-конфига.
  • health (HealthRegistry) - агрегирование здоровья воркеров и дополнительных компонентов.
  • workflow (WorkflowEngine и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния.
  • control plane (ControlPlaneService, HttpControlChannel) - внешние health/action endpoints.
  • application HTTP (ApplicationHttpService, HttpApplicationChannel) - пользовательские HTTP routes бизнес-приложения.
  • queue (InMemoryTaskQueue) - локальный in-memory буфер как утилита прикладного уровня.

3. Архитектура

classDiagram
    class ApplicationModule {
      <<abstract>>
      +name: str
      +register(registry)
    }
    class ModuleRegistry {
      +add_worker(worker)
      +add_health_contributor(contributor)
      +register_module(name)
    }
    class RuntimeManager {
      +register_module(module)
      +add_config_file(path)
      +start()
      +stop(timeout, force)
      +status()
      +current_health()
    }
    class WorkerSupervisor {
      +register(worker)
      +start()
      +stop(timeout, force)
      +statuses()
      +healths()
    }
    class Worker {
      <<abstract>>
      +name: str
      +critical: bool
      +start()
      +stop(force)
      +health()
      +status()
    }
    class Routine {
      <<pattern>>
      +run()
    }

    class ConfigurationManager
    class LogManager
    class HealthRegistry
    class TraceService
    class ControlPlaneService
    class ApplicationHttpService
    class WorkflowRuntimeFactory
    class WorkflowEngine
    class WorkflowPersistence
    class WorkflowRepository
    class CheckpointRepository
    class InMemoryTaskQueue
    class MySqlTraceTransport

    ApplicationModule --> ModuleRegistry : register(...)
    RuntimeManager --> ApplicationModule : register_module(...)
    RuntimeManager --> ModuleRegistry
    RuntimeManager --> ConfigurationManager
    RuntimeManager --> LogManager
    RuntimeManager --> HealthRegistry
    RuntimeManager --> TraceService
    RuntimeManager --> WorkerSupervisor
    RuntimeManager --> ControlPlaneService
    RuntimeManager --> ApplicationHttpService

    WorkerSupervisor --> Worker
    Worker --> Routine : invokes

    WorkflowRuntimeFactory --> WorkflowEngine : create_engine(...)
    WorkflowEngine --> WorkflowPersistence
    WorkflowPersistence --> WorkflowRepository
    WorkflowPersistence --> CheckpointRepository

    TraceService --> MySqlTraceTransport

4. Описание компонентов

4.1 Core модули

ApplicationModule

  • Назначение: композиция прикладного приложения и регистрация runtime-компонентов.
  • Реализация: контракт app_runtime.contracts.application.ApplicationModule, регистрация через app_runtime.core.registration.ModuleRegistry.
  • Как работает / API / вызовы / таблицы:
    • API: name, register(registry).
  • Типичные вызовы: registry.add_worker(worker), registry.add_health_contributor(contributor).
  • Для HTTP-модулей: registry.add_http_routes(registrar).
    • RuntimeManager.register_module() вызывает module.register(...) и добавляет имя модуля в снимок runtime.
    • В БД напрямую не пишет.
  • Типовая схема использования:
    • Создать инфраструктурные и доменные сервисы.
    • Создать Routine.
    • Создать Worker с зависимостью на рутину.
    • Зарегистрировать воркер и опциональные health contributors.

Worker

  • Назначение: runtime-исполнение бизнес-активности, управление lifecycle, интерпретация health/status.
  • Реализация: контракт app_runtime.contracts.worker.Worker, оркестрация через app_runtime.workers.supervisor.WorkerSupervisor.
  • Как работает / API / вызовы / таблицы:
    • API: start(), stop(force=False), health() -> WorkerHealth, status() -> WorkerStatus, свойства name, critical.
    • WorkerSupervisor.start() запускает все воркеры, stop() останавливает и ждет state=stopped.
    • RuntimeManager получает агрегированные statuses()/healths() у супервизора.
    • В БД напрямую не пишет (если прикладной worker сам не реализует persistence).
  • Типовая схема использования:
    • Внутри start() поднимать поток/пул или запускать одноразовую задачу.
    • В цикле вызывать routine.run().
    • Ошибки бизнес-логики транслировать в health()/status().

Routine

  • Назначение: изоляция прикладной бизнес-логики от runtime-обвязки.
  • Реализация: прикладной класс (паттерн), обычно с методом run(); платформой не навязывается отдельный интерфейс.
  • Как работает / API / вызовы / таблицы:
    • Типичный API: run() + дополнительные domain-методы.
    • Вызывается воркером в выбранной стратегии выполнения (single-run/loop, 1..N потоков).
    • Может вызывать сервисы интеграций, workflow, очереди, доменные репозитории.
    • Запись в таблицы определяется прикладными сервисами, а не самой платформой.
  • Типовая схема использования:
    • Оставлять рутину тонким оркестратором бизнес-действий.
    • Сложную логику выносить в отдельные доменные сервисы.

4.2 Service модули

Configuration

  • Назначение: централизованная загрузка и слияние runtime-конфигурации.
  • Реализация: ConfigurationManager, FileConfigProvider, ConfigFileLoader.
  • Как работает / API / вызовы / таблицы:
    • API: add_provider(), load(), reload(), get(), section().
    • RuntimeManager.start() вызывает configuration.load().
    • Поддерживается YAML/JSON через ConfigFileLoader.parse().
    • В БД не пишет.
  • Типовая схема использования:
    • В bootstrap добавить файл: runtime.add_config_file("config.yml").
    • Читать секции из runtime.configuration.section("...") в прикладных фабриках.

Logging

  • Назначение: применение и восстановление валидной конфигурации логирования.
  • Реализация: LogManager.
  • Как работает / API / вызовы / таблицы:
    • API: apply_config(config).
    • RuntimeManager.start() вызывает logs.apply_config(config).
    • Если секция log некорректна, сервис пытается восстановить предыдущую валидную конфигурацию.
    • В БД не пишет.
  • Типовая схема использования:
    • Передать log секцию в конфиг и запускать runtime стандартно через start().

Health

  • Назначение: сводный health приложения по воркерам и дополнительным контрибьюторам.
  • Реализация: HealthRegistry + контрибьюторы по контракту HealthContributor.
  • Как работает / API / вызовы / таблицы:
    • API: register(), snapshot(worker_healths), payload(state, worker_healths).
    • Агрегация: unhealthy при критичном unhealthy, degraded при любой деградации, иначе ok.
    • RuntimeManager.current_health() использует HealthRegistry.payload(...).
    • В БД не пишет.
  • Типовая схема использования:
    • Зарегистрировать contributor в ApplicationModule.register(...).
    • Отдавать health наружу через control plane /health.

Tracing

  • Назначение: фиксация контекстов выполнения и событий по шагам операций.
  • Реализация: TraceService, TraceContextStore, NoOpTraceTransport, MySqlTraceTransport.
  • Как работает / API / вызовы / таблицы:
    • API: create_context(), open_context(), step(), info()/warning()/error()/exception(), new_root(), child_of(), attach(), resume().
    • TraceService пишет записи через transport; ошибки транспорта изолируются в логах.
    • При MySqlTraceTransport записи идут в таблицы:
      • trace_contexts
      • trace_messages
    • Эталонная схема и migration для MySQL: requirements/sql/trace_mysql_schema.sql.
  • Trace Context:
    • Что это: запись о начале логического контекста выполнения (операция, воркер, подоперация), к которой потом привязываются trace-сообщения.
    • Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции.
    • Атрибуты контекста (TraceContextRecord): trace_id, alias, parent_id, type, event_time, attrs.
    • Иерархия: parent_id указывает на родительский контекст; так строится цепочка root -> child.
    • Стандартная модель для бизнес-приложений:
      • root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск;
      • child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ;
      • child-context должен создаваться через parent_id=<trace_id root-context> и передаваться дальше через trace_context / runtime metadata как активный trace конкретной бизнес-операции;
      • если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts.
    • Таблица: trace_contexts.
    • Как объявляется в коде:
with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine": "orders"}) as trace_id:
    ...
root = traces.new_root("orders.sync")
child = traces.child_of(root, "orders.process_batch")
with traces.open_context(alias="email:123", kind="email") as message_trace_id:
    ...
    with traces.open_context(
        alias="attachment:invoice.xlsx",
        parent_id=message_trace_id,
        kind="task",
        attrs={"attachment_index": 0},
    ) as task_trace_id:
        runtime["trace_id"] = task_trace_id
        runtime["message_trace_id"] = message_trace_id
        ...
  • Стандарт для workflow persistence:
    • если workflow представляет отдельную бизнес-задачу, в state.runtime.trace_id должен лежать активный trace этой задачи;
    • дополнительные идентификаторы родительских контекстов (message_trace_id, email_trace_id и т.д.) могут храниться рядом в runtime для логов и связности, но workflow_runs.trace_id должен ссылаться именно на активный trace текущей задачи.
  • Trace Message:
    • Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация).
    • Роль step: текущая стадия операции (parse, validate, persist и т.д.), которую выставляют через traces.step("...").
    • Уровни сообщений: INFO, WARNING, ERROR; exception(...) пишет сообщение уровня ERROR.
    • Атрибуты сообщения (TraceLogMessage): trace_id, step, status, message, level, event_time, attrs.
    • Связь с context: каждое сообщение обязательно связано с текущим активным trace_id; без активного контекста TraceService выбрасывает ошибку.
    • Таблица: trace_messages.
    • Как правильно применять в проекте:
      • Открывать один root-context на единицу бизнес-работы (например, обработка одного сообщения/заказа).
      • Перед важными этапами явно менять step.
      • info использовать для нормального прогресса, warning для recoverable ситуаций (retry/fallback), error/exception для сбоев.
      • В attrs класть диагностические ключи (entity_id, attempt, integration, duration_ms), чтобы ускорить расследование инцидентов.
  • Типовая схема использования:
    • В воркере открыть контекст (open_context) и на каждом шаге рутины писать step()/info().

Workflow

  • Назначение: исполнение step-based workflow с переходами и сохранением прогресса.
  • Реализация: WorkflowRuntimeFactory, WorkflowEngine, WorkflowPersistence, WorkflowRepository, CheckpointRepository.
  • Как работает / API / вызовы / таблицы:
    • API верхнего уровня: WorkflowRuntimeFactory.create_engine(workflow), WorkflowEngine.run(context).
    • WorkflowEngine по шагам вызывает WorkflowStep.run(context), применяет TransitionResolver, вызывает hooks.
    • WorkflowPersistence пишет состояние run/step/checkpoint через репозитории.
    • При настроенном подключении к БД записи идут в таблицы:
      • workflow_runs
      • workflow_steps
      • workflow_checkpoints
    • Без подключения работает in-memory fallback репозиториев.
  • Типовая схема использования:
    • Описать WorkflowDefinition (узлы и transitions).
    • Создать engine через фабрику.
    • Запускать из рутины или воркера: engine.run(context).

Control Plane

  • Назначение: внешний канал управления runtime и чтения health/status.
  • Реализация: ControlPlaneService, HttpControlChannel, HttpControlAppFactory.
  • Как работает / API / вызовы / таблицы:
    • API: register_channel(), start(runtime), stop(), snapshot(runtime).
    • HTTP-канал поднимает endpoint'ы:
      • GET /health
      • GET|POST /actions/{action} (start, stop, status)
    • Колбэки action'ов вызывают async-методы RuntimeManager.
    • В БД не пишет.
  • Типовая схема использования:
    • При создании runtime включить enable_http_control=True.
    • Использовать /health для readiness/liveness и /actions/* для операционного контроля.

Простой Web UI через nginx (порт 15000)

  • Статические файлы UI: web/control-ui/ (index.html, app.js, styles.css).
  • UI использует polling (/api/health раз в 2 секунды) и кнопки Start/Stop (POST /api/actions/start|stop).
  • Для вызовов API UI передает заголовок X-Client-Source: web-ui.
  • Пример server-конфига nginx: deploy/nginx/plba-control-ui.conf.
    • Слушает 15000.
    • Отдает UI из /usr/share/nginx/html.
    • Проксирует /api/* на control API http://app:8080/* (sidecar в docker network).
  • Пример запуска в составе бизнес-приложения: deploy/docker-compose.control-ui.yml.
    • Публикуется только один внешний порт: 15000.
    • Внутренний control API остается в сети compose и доступен nginx по имени сервиса app.
    • В app нужно поднять control channel на 0.0.0.0:8080.

Queue

  • Назначение: простой in-memory буфер задач/сообщений внутри приложения.
  • Реализация: InMemoryTaskQueue[T].
  • Как работает / API / вызовы / таблицы:
    • API: put(item), get(timeout), task_done(), qsize(), stats().
    • Может использоваться между воркерами/сервисами как локальная очередь.
    • В БД не пишет.
  • Типовая схема использования:
    • Producer в рутине кладет элементы через put.
    • Consumer-воркер извлекает через get(timeout) и обрабатывает.

5. Application HTTP

PLBA поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от control plane, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.

Control Plane и Application HTTP обслуживают разные контуры:

  • Control Plane используется для /health, /actions/*, /traces/*.
  • Application HTTP используется для бизнес-маршрутов приложения, например /estimate, /estimate/api/tasks, /api/orders.

Основные компоненты

  • ApplicationHttpService управляет lifecycle прикладного HTTP на уровне runtime.
  • HttpApplicationChannel поднимает отдельный FastAPI app через uvicorn.
  • HttpRouteRegistrar регистрирует пользовательские routes и получает ServiceContainer.
  • HttpApplicationAppFactory собирает FastAPI(title="PLBA Application API"), middleware и routes.
  • UnifiedHttpService собирает один FastAPI app с control routes (/health, /actions/*, /traces/*) и application routes.

Минимальный пример

from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from plba import ApplicationModule, HttpApplicationChannel, create_runtime


class DemoRoutes:
    def register(self, app: FastAPI, services) -> None:
        @app.get("/demo")
        async def demo_page():
            return HTMLResponse("<h1>Demo</h1>")

        @app.post("/demo/api/tasks")
        async def create_task(file: UploadFile = File(...)):
            payload = await file.read()
            return {"filename": file.filename, "size": len(payload)}


class DemoModule(ApplicationModule):
    @property
    def name(self) -> str:
        return "demo"

    def register(self, registry) -> None:
        registry.add_http_routes(DemoRoutes())


runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.application_http.register_channel(
    HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
runtime.start()

После старта runtime приложение будет обслуживать:

  • GET /demo
  • POST /demo/api/tasks

Единый HTTP API

Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте UnifiedHttpService как application_http:

from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService

runtime = RuntimeManager(application_http=UnifiedHttpService())
runtime.application_http.register_channel(
    HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)

Старый режим с отдельными ControlPlaneService и ApplicationHttpService остаётся доступен и работает как раньше.

Типовые сценарии

  • Web UI для фоновых задач: список задач, запуск, статус, cancel, download result.
  • Внутренний REST API: например POST /jobs, GET /jobs/{id}, POST /jobs/{id}/cancel.
  • Файловый шлюз: загрузка входного файла, асинхронная обработка через worker, скачивание результата.
  • Webhook endpoint: прием callback от внешней системы и передача события в worker pipeline.

Ограничения и рекомендации

  • По умолчанию держите business routes и control plane раздельно; используйте UnifiedHttpService, когда один опубликованный порт является явным эксплуатационным требованием.
  • Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
  • Runtime предоставляет доступ к зависимостям через services, поэтому handlers не должны зависеть от RuntimeManager.
  • В первой версии Application HTTP не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
  • Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress.

6. MVP бизнес-приложения

Минимальная конфигурация запуска:

  1. Создать один ApplicationModule.
  2. В модуле собрать одну Routine и один Worker (1 worker -> 1 routine).
  3. Зарегистрировать воркер через registry.add_worker(...).
  4. Создать runtime: create_runtime(module, config_path="config.yml").
  5. Вызвать runtime.start().

Минимальный пример:

from plba import ApplicationModule, Worker, WorkerHealth, WorkerStatus, create_runtime
from app_runtime.core.registration import ModuleRegistry


class DemoRoutine:
    def run(self) -> None:
        print("business action")


class DemoWorker(Worker):
    def __init__(self, routine: DemoRoutine) -> None:
        self._routine = routine
        self._started = False

    @property
    def name(self) -> str:
        return "demo-worker"

    @property
    def critical(self) -> bool:
        return True

    def start(self) -> None:
        self._started = True
        self._routine.run()

    def stop(self, force: bool = False) -> None:
        del force
        self._started = False

    def health(self) -> WorkerHealth:
        return WorkerHealth(name=self.name, status="ok", critical=self.critical)

    def status(self) -> WorkerStatus:
        return WorkerStatus(name=self.name, state="idle" if self._started else "stopped")


class DemoModule(ApplicationModule):
    @property
    def name(self) -> str:
        return "demo"

    def register(self, registry: ModuleRegistry) -> None:
        registry.add_worker(DemoWorker(DemoRoutine()))


runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.start()

Для production-сценария после MVP обычно добавляют tracing, health contributors, workflow, HTTP control plane и при необходимости application HTTP, но базовый запуск не требует этих расширений.

S
Description
No description provided
Readme 488 KiB
Languages
Python 100%