# PLBA ## Установка Установка пакета напрямую из Git-репозитория через `pip`: ```bash export GIT_PLBA_TOKEN="<ваш_токен>" pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git" ``` Если нужен конкретный тег, ветка или commit, добавьте ref после `.git`: ```bash pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main" ``` ### Доступ через `GIT_PLBA_TOKEN` Для установки по HTTPS используется переменная окружения `GIT_PLBA_TOKEN`. В нее нужно положить персональный Git-токен с правом чтения репозитория `alex/plba`. Токен передается в URL установки и позволяет `pip` скачать исходники без SSH-доступа. Рекомендуется: - экспортировать токен только в текущую shell-сессию; - не хардкодить токен в `requirements.txt`, `pyproject.toml` или исходниках; - использовать отдельный read-only токен, если Git-сервер это поддерживает. ## 1. Назначение платформы PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании. ## 2. Концепция использования `ApplicationModule` - точка сборки приложения: здесь создаются зависимости, собираются рутины, создаются воркеры и регистрируются дополнительные health-контрибьюторы. `Worker` - основной runtime-контракт платформы: он управляет исполнением (потоки, цикл, стратегия остановки), возвращает `health()` и `status()`, а также определяет критичность компонента для общего health. `Routine` - прикладной паттерн (не обязательный контракт платформы): класс или набор классов, где сосредоточена бизнес-логика, которую воркер регулярно или однократно запускает. Вспомогательные сервисы платформы дополняют core-модель: - `tracing` (`TraceService`, транспорты) - операционная трассировка контекстов и сообщений. - `logging` (`LogManager`) - применение конфигурации логирования из runtime-конфига. - `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов. - `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния. - `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints. - `application HTTP` (`ApplicationHttpService`, `HttpApplicationChannel`) - пользовательские HTTP routes бизнес-приложения. - `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня. ## 3. Архитектура ```mermaid classDiagram class ApplicationModule { <> +name: str +register(registry) } class ModuleRegistry { +add_worker(worker) +add_health_contributor(contributor) +register_module(name) } class RuntimeManager { +register_module(module) +add_config_file(path) +start() +stop(timeout, force) +status() +current_health() } class WorkerSupervisor { +register(worker) +start() +stop(timeout, force) +statuses() +healths() } class Worker { <> +name: str +critical: bool +start() +stop(force) +health() +status() } class Routine { <> +run() } class ConfigurationManager class LogManager class HealthRegistry class TraceService class ControlPlaneService class ApplicationHttpService class WorkflowRuntimeFactory class WorkflowEngine class WorkflowPersistence class WorkflowRepository class CheckpointRepository class InMemoryTaskQueue class MySqlTraceTransport ApplicationModule --> ModuleRegistry : register(...) RuntimeManager --> ApplicationModule : register_module(...) RuntimeManager --> ModuleRegistry RuntimeManager --> ConfigurationManager RuntimeManager --> LogManager RuntimeManager --> HealthRegistry RuntimeManager --> TraceService RuntimeManager --> WorkerSupervisor RuntimeManager --> ControlPlaneService RuntimeManager --> ApplicationHttpService WorkerSupervisor --> Worker Worker --> Routine : invokes WorkflowRuntimeFactory --> WorkflowEngine : create_engine(...) WorkflowEngine --> WorkflowPersistence WorkflowPersistence --> WorkflowRepository WorkflowPersistence --> CheckpointRepository TraceService --> MySqlTraceTransport ``` ## 4. Описание компонентов ### 4.1 Core модули #### ApplicationModule - Назначение: композиция прикладного приложения и регистрация runtime-компонентов. - Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`. - Как работает / API / вызовы / таблицы: - API: `name`, `register(registry)`. - Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`. - Для HTTP-модулей: `registry.add_http_routes(registrar)`. - `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime. - В БД напрямую не пишет. - Типовая схема использования: - Создать инфраструктурные и доменные сервисы. - Создать `Routine`. - Создать `Worker` с зависимостью на рутину. - Зарегистрировать воркер и опциональные health contributors. #### Worker - Назначение: runtime-исполнение бизнес-активности, управление lifecycle, интерпретация health/status. - Реализация: контракт `app_runtime.contracts.worker.Worker`, оркестрация через `app_runtime.workers.supervisor.WorkerSupervisor`. - Как работает / API / вызовы / таблицы: - API: `start()`, `stop(force=False)`, `health() -> WorkerHealth`, `status() -> WorkerStatus`, свойства `name`, `critical`. - `WorkerSupervisor.start()` запускает все воркеры, `stop()` останавливает и ждет state=`stopped`. - `RuntimeManager` получает агрегированные `statuses()`/`healths()` у супервизора. - В БД напрямую не пишет (если прикладной worker сам не реализует persistence). - Типовая схема использования: - Внутри `start()` поднимать поток/пул или запускать одноразовую задачу. - В цикле вызывать `routine.run()`. - Ошибки бизнес-логики транслировать в `health()`/`status()`. #### Routine - Назначение: изоляция прикладной бизнес-логики от runtime-обвязки. - Реализация: прикладной класс (паттерн), обычно с методом `run()`; платформой не навязывается отдельный интерфейс. - Как работает / API / вызовы / таблицы: - Типичный API: `run()` + дополнительные domain-методы. - Вызывается воркером в выбранной стратегии выполнения (single-run/loop, 1..N потоков). - Может вызывать сервисы интеграций, workflow, очереди, доменные репозитории. - Запись в таблицы определяется прикладными сервисами, а не самой платформой. - Типовая схема использования: - Оставлять рутину тонким оркестратором бизнес-действий. - Сложную логику выносить в отдельные доменные сервисы. ### 4.2 Service модули #### Configuration - Назначение: централизованная загрузка и слияние runtime-конфигурации. - Реализация: `ConfigurationManager`, `FileConfigProvider`, `ConfigFileLoader`. - Как работает / API / вызовы / таблицы: - API: `add_provider()`, `load()`, `reload()`, `get()`, `section()`. - `RuntimeManager.start()` вызывает `configuration.load()`. - Поддерживается YAML/JSON через `ConfigFileLoader.parse()`. - В БД не пишет. - Типовая схема использования: - В bootstrap добавить файл: `runtime.add_config_file("config.yml")`. - Читать секции из `runtime.configuration.section("...")` в прикладных фабриках. #### Logging - Назначение: применение и восстановление валидной конфигурации логирования. - Реализация: `LogManager`. - Как работает / API / вызовы / таблицы: - API: `apply_config(config)`. - `RuntimeManager.start()` вызывает `logs.apply_config(config)`. - Если секция `log` некорректна, сервис пытается восстановить предыдущую валидную конфигурацию. - В БД не пишет. - Типовая схема использования: - Передать `log` секцию в конфиг и запускать runtime стандартно через `start()`. #### Health - Назначение: сводный health приложения по воркерам и дополнительным контрибьюторам. - Реализация: `HealthRegistry` + контрибьюторы по контракту `HealthContributor`. - Как работает / API / вызовы / таблицы: - API: `register()`, `snapshot(worker_healths)`, `payload(state, worker_healths)`. - Агрегация: `unhealthy` при критичном `unhealthy`, `degraded` при любой деградации, иначе `ok`. - `RuntimeManager.current_health()` использует `HealthRegistry.payload(...)`. - В БД не пишет. - Типовая схема использования: - Зарегистрировать contributor в `ApplicationModule.register(...)`. - Отдавать health наружу через control plane `/health`. #### Tracing - Назначение: фиксация контекстов выполнения и событий по шагам операций. - Реализация: `TraceService`, `TraceContextStore`, `NoOpTraceTransport`, `MySqlTraceTransport`. - Как работает / API / вызовы / таблицы: - API: `create_context()`, `open_context()`, `step()`, `info()/warning()/error()/exception()`, `new_root()`, `child_of()`, `attach()`, `resume()`. - `TraceService` пишет записи через transport; ошибки транспорта изолируются в логах. - При `MySqlTraceTransport` записи идут в таблицы: - `trace_contexts` - `trace_messages` - Эталонная схема и migration для MySQL: `requirements/sql/trace_mysql_schema.sql`. - Trace Context: - Что это: запись о начале логического контекста выполнения (операция, воркер, подоперация), к которой потом привязываются trace-сообщения. - Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции. - Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`. - Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child. - Стандартная модель для бизнес-приложений: - root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск; - child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ; - child-context должен создаваться через `parent_id=` и передаваться дальше через `trace_context` / runtime metadata как активный trace конкретной бизнес-операции; - если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts. - Таблица: `trace_contexts`. - Как объявляется в коде: ```python with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine": "orders"}) as trace_id: ... ``` ```python root = traces.new_root("orders.sync") child = traces.child_of(root, "orders.process_batch") ``` ```python with traces.open_context(alias="email:123", kind="email") as message_trace_id: ... with traces.open_context( alias="attachment:invoice.xlsx", parent_id=message_trace_id, kind="task", attrs={"attachment_index": 0}, ) as task_trace_id: runtime["trace_id"] = task_trace_id runtime["message_trace_id"] = message_trace_id ... ``` - Стандарт для workflow persistence: - если workflow представляет отдельную бизнес-задачу, в `state.runtime.trace_id` должен лежать активный trace этой задачи; - дополнительные идентификаторы родительских контекстов (`message_trace_id`, `email_trace_id` и т.д.) могут храниться рядом в runtime для логов и связности, но `workflow_runs.trace_id` должен ссылаться именно на активный trace текущей задачи. - Trace Message: - Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация). - Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`. - Уровни сообщений: `INFO`, `WARNING`, `ERROR`; `exception(...)` пишет сообщение уровня `ERROR`. - Атрибуты сообщения (`TraceLogMessage`): `trace_id`, `step`, `status`, `message`, `level`, `event_time`, `attrs`. - Связь с context: каждое сообщение обязательно связано с текущим активным `trace_id`; без активного контекста `TraceService` выбрасывает ошибку. - Таблица: `trace_messages`. - Как правильно применять в проекте: - Открывать один root-context на единицу бизнес-работы (например, обработка одного сообщения/заказа). - Перед важными этапами явно менять `step`. - `info` использовать для нормального прогресса, `warning` для recoverable ситуаций (retry/fallback), `error`/`exception` для сбоев. - В `attrs` класть диагностические ключи (`entity_id`, `attempt`, `integration`, `duration_ms`), чтобы ускорить расследование инцидентов. - Типовая схема использования: - В воркере открыть контекст (`open_context`) и на каждом шаге рутины писать `step()/info()`. #### Workflow - Назначение: исполнение step-based workflow с переходами и сохранением прогресса. - Реализация: `WorkflowRuntimeFactory`, `WorkflowEngine`, `WorkflowPersistence`, `WorkflowRepository`, `CheckpointRepository`. - Как работает / API / вызовы / таблицы: - API верхнего уровня: `WorkflowRuntimeFactory.create_engine(workflow)`, `WorkflowEngine.run(context)`. - `WorkflowEngine` по шагам вызывает `WorkflowStep.run(context)`, применяет `TransitionResolver`, вызывает hooks. - `WorkflowPersistence` пишет состояние run/step/checkpoint через репозитории. - При настроенном подключении к БД записи идут в таблицы: - `workflow_runs` - `workflow_steps` - `workflow_checkpoints` - Без подключения работает in-memory fallback репозиториев. - Типовая схема использования: - Описать `WorkflowDefinition` (узлы и transitions). - Создать engine через фабрику. - Запускать из рутины или воркера: `engine.run(context)`. #### Control Plane - Назначение: внешний канал управления runtime и чтения health/status. - Реализация: `ControlPlaneService`, `HttpControlChannel`, `HttpControlAppFactory`. - Как работает / API / вызовы / таблицы: - API: `register_channel()`, `start(runtime)`, `stop()`, `snapshot(runtime)`. - HTTP-канал поднимает endpoint'ы: - `GET /health` - `GET|POST /actions/{action}` (`start`, `stop`, `status`) - Колбэки action'ов вызывают async-методы `RuntimeManager`. - В БД не пишет. - Типовая схема использования: - При создании runtime включить `enable_http_control=True`. - Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля. #### Простой Web UI через nginx (порт 15000) - Статические файлы UI: `web/control-ui/` (`index.html`, `app.js`, `styles.css`). - UI использует polling (`/api/health` раз в 2 секунды) и кнопки `Start/Stop` (`POST /api/actions/start|stop`). - Для вызовов API UI передает заголовок `X-Client-Source: web-ui`. - Пример server-конфига nginx: `deploy/nginx/plba-control-ui.conf`. - Слушает `15000`. - Отдает UI из `/usr/share/nginx/html`. - Проксирует `/api/*` на control API `http://app:8080/*` (sidecar в docker network). - Пример запуска в составе бизнес-приложения: `deploy/docker-compose.control-ui.yml`. - Публикуется только один внешний порт: `15000`. - Внутренний control API остается в сети compose и доступен nginx по имени сервиса `app`. - В `app` нужно поднять control channel на `0.0.0.0:8080`. #### Queue - Назначение: простой in-memory буфер задач/сообщений внутри приложения. - Реализация: `InMemoryTaskQueue[T]`. - Как работает / API / вызовы / таблицы: - API: `put(item)`, `get(timeout)`, `task_done()`, `qsize()`, `stats()`. - Может использоваться между воркерами/сервисами как локальная очередь. - В БД не пишет. - Типовая схема использования: - Producer в рутине кладет элементы через `put`. - Consumer-воркер извлекает через `get(timeout)` и обрабатывает. ## 5. Application HTTP `PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel. `Control Plane` и `Application HTTP` обслуживают разные контуры: - `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`. - `Application HTTP` используется для бизнес-маршрутов приложения, например `/estimate`, `/estimate/api/tasks`, `/api/orders`. ### Основные компоненты - `ApplicationHttpService` управляет lifecycle прикладного HTTP на уровне runtime. - `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`. - `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`. - `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes. - `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes. ### Минимальный пример ```python from fastapi import FastAPI, File, UploadFile from fastapi.responses import HTMLResponse from plba import ApplicationModule, HttpApplicationChannel, create_runtime class DemoRoutes: def register(self, app: FastAPI, services) -> None: @app.get("/demo") async def demo_page(): return HTMLResponse("

Demo

") @app.post("/demo/api/tasks") async def create_task(file: UploadFile = File(...)): payload = await file.read() return {"filename": file.filename, "size": len(payload)} class DemoModule(ApplicationModule): @property def name(self) -> str: return "demo" def register(self, registry) -> None: registry.add_http_routes(DemoRoutes()) runtime = create_runtime(DemoModule(), config_path="config.yml") runtime.application_http.register_channel( HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5) ) runtime.start() ``` После старта runtime приложение будет обслуживать: - `GET /demo` - `POST /demo/api/tasks` ### Единый HTTP API Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`: ```python from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService runtime = RuntimeManager(application_http=UnifiedHttpService()) runtime.application_http.register_channel( HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5) ) ``` Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше. ### Типовые сценарии - `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result. - `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`. - `Файловый шлюз`: загрузка входного файла, асинхронная обработка через worker, скачивание результата. - `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline. ### Ограничения и рекомендации - По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием. - Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер. - Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`. - В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization. - Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress. ## 6. MVP бизнес-приложения Минимальная конфигурация запуска: 1. Создать один `ApplicationModule`. 2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine). 3. Зарегистрировать воркер через `registry.add_worker(...)`. 4. Создать runtime: `create_runtime(module, config_path="config.yml")`. 5. Вызвать `runtime.start()`. Минимальный пример: ```python from plba import ApplicationModule, Worker, WorkerHealth, WorkerStatus, create_runtime from app_runtime.core.registration import ModuleRegistry class DemoRoutine: def run(self) -> None: print("business action") class DemoWorker(Worker): def __init__(self, routine: DemoRoutine) -> None: self._routine = routine self._started = False @property def name(self) -> str: return "demo-worker" @property def critical(self) -> bool: return True def start(self) -> None: self._started = True self._routine.run() def stop(self, force: bool = False) -> None: del force self._started = False def health(self) -> WorkerHealth: return WorkerHealth(name=self.name, status="ok", critical=self.critical) def status(self) -> WorkerStatus: return WorkerStatus(name=self.name, state="idle" if self._started else "stopped") class DemoModule(ApplicationModule): @property def name(self) -> str: return "demo" def register(self, registry: ModuleRegistry) -> None: registry.add_worker(DemoWorker(DemoRoutine())) runtime = create_runtime(DemoModule(), config_path="config.yml") runtime.start() ``` Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow`, HTTP control plane и при необходимости `application HTTP`, но базовый запуск не требует этих расширений.