PLBA
Установка
Установка пакета напрямую из Git-репозитория через pip:
export GIT_PLBA_TOKEN="<ваш_токен>"
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git"
Если нужен конкретный тег, ветка или commit, добавьте ref после .git:
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main"
Доступ через GIT_PLBA_TOKEN
Для установки по HTTPS используется переменная окружения GIT_PLBA_TOKEN. В нее нужно положить персональный Git-токен с правом чтения репозитория alex/plba. Токен передается в URL установки и позволяет pip скачать исходники без SSH-доступа.
Рекомендуется:
- экспортировать токен только в текущую shell-сессию;
- не хардкодить токен в
requirements.txt,pyproject.tomlили исходниках; - использовать отдельный read-only токен, если Git-сервер это поддерживает.
1. Назначение платформы
PLBA (Platform Runtime for Business Applications) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг ApplicationModule, Worker и прикладной Routine, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании.
2. Концепция использования
ApplicationModule - точка сборки приложения: здесь создаются зависимости, собираются рутины, создаются воркеры и регистрируются дополнительные health-контрибьюторы.
Worker - основной runtime-контракт платформы: он управляет исполнением (потоки, цикл, стратегия остановки), возвращает health() и status(), а также определяет критичность компонента для общего health.
Routine - прикладной паттерн (не обязательный контракт платформы): класс или набор классов, где сосредоточена бизнес-логика, которую воркер регулярно или однократно запускает.
Вспомогательные сервисы платформы дополняют core-модель:
tracing(TraceService, транспорты) - операционная трассировка контекстов и сообщений.logging(LogManager) - применение конфигурации логирования из runtime-конфига.health(HealthRegistry) - агрегирование здоровья воркеров и дополнительных компонентов.workflow(WorkflowEngineи persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния.control plane(ControlPlaneService,HttpControlChannel) - внешние health/action endpoints.application HTTP(ApplicationHttpService,HttpApplicationChannel) - пользовательские HTTP routes бизнес-приложения.queue(InMemoryTaskQueue) - локальный in-memory буфер как утилита прикладного уровня.
3. Архитектура
classDiagram
class ApplicationModule {
<<abstract>>
+name: str
+register(registry)
}
class ModuleRegistry {
+add_worker(worker)
+add_health_contributor(contributor)
+register_module(name)
}
class RuntimeManager {
+register_module(module)
+add_config_file(path)
+start()
+stop(timeout, force)
+status()
+current_health()
}
class WorkerSupervisor {
+register(worker)
+start()
+stop(timeout, force)
+statuses()
+healths()
}
class Worker {
<<abstract>>
+name: str
+critical: bool
+start()
+stop(force)
+health()
+status()
}
class Routine {
<<pattern>>
+run()
}
class ConfigurationManager
class LogManager
class HealthRegistry
class TraceService
class ControlPlaneService
class ApplicationHttpService
class WorkflowRuntimeFactory
class WorkflowEngine
class WorkflowPersistence
class WorkflowRepository
class CheckpointRepository
class InMemoryTaskQueue
class MySqlTraceTransport
ApplicationModule --> ModuleRegistry : register(...)
RuntimeManager --> ApplicationModule : register_module(...)
RuntimeManager --> ModuleRegistry
RuntimeManager --> ConfigurationManager
RuntimeManager --> LogManager
RuntimeManager --> HealthRegistry
RuntimeManager --> TraceService
RuntimeManager --> WorkerSupervisor
RuntimeManager --> ControlPlaneService
RuntimeManager --> ApplicationHttpService
WorkerSupervisor --> Worker
Worker --> Routine : invokes
WorkflowRuntimeFactory --> WorkflowEngine : create_engine(...)
WorkflowEngine --> WorkflowPersistence
WorkflowPersistence --> WorkflowRepository
WorkflowPersistence --> CheckpointRepository
TraceService --> MySqlTraceTransport
4. Описание компонентов
4.1 Core модули
ApplicationModule
- Назначение: композиция прикладного приложения и регистрация runtime-компонентов.
- Реализация: контракт
app_runtime.contracts.application.ApplicationModule, регистрация черезapp_runtime.core.registration.ModuleRegistry. - Как работает / API / вызовы / таблицы:
- API:
name,register(registry).
- API:
- Типичные вызовы:
registry.add_worker(worker),registry.add_health_contributor(contributor). - Для HTTP-модулей:
registry.add_http_routes(registrar).RuntimeManager.register_module()вызываетmodule.register(...)и добавляет имя модуля в снимок runtime.- В БД напрямую не пишет.
- Типовая схема использования:
- Создать инфраструктурные и доменные сервисы.
- Создать
Routine. - Создать
Workerс зависимостью на рутину. - Зарегистрировать воркер и опциональные health contributors.
Worker
- Назначение: runtime-исполнение бизнес-активности, управление lifecycle, интерпретация health/status.
- Реализация: контракт
app_runtime.contracts.worker.Worker, оркестрация черезapp_runtime.workers.supervisor.WorkerSupervisor. - Как работает / API / вызовы / таблицы:
- API:
start(),stop(force=False),health() -> WorkerHealth,status() -> WorkerStatus, свойстваname,critical. WorkerSupervisor.start()запускает все воркеры,stop()останавливает и ждет state=stopped.RuntimeManagerполучает агрегированныеstatuses()/healths()у супервизора.- В БД напрямую не пишет (если прикладной worker сам не реализует persistence).
- API:
- Типовая схема использования:
- Внутри
start()поднимать поток/пул или запускать одноразовую задачу. - В цикле вызывать
routine.run(). - Ошибки бизнес-логики транслировать в
health()/status().
- Внутри
Routine
- Назначение: изоляция прикладной бизнес-логики от runtime-обвязки.
- Реализация: прикладной класс (паттерн), обычно с методом
run(); платформой не навязывается отдельный интерфейс. - Как работает / API / вызовы / таблицы:
- Типичный API:
run()+ дополнительные domain-методы. - Вызывается воркером в выбранной стратегии выполнения (single-run/loop, 1..N потоков).
- Может вызывать сервисы интеграций, workflow, очереди, доменные репозитории.
- Запись в таблицы определяется прикладными сервисами, а не самой платформой.
- Типичный API:
- Типовая схема использования:
- Оставлять рутину тонким оркестратором бизнес-действий.
- Сложную логику выносить в отдельные доменные сервисы.
4.2 Service модули
Configuration
- Назначение: централизованная загрузка и слияние runtime-конфигурации.
- Реализация:
ConfigurationManager,FileConfigProvider,ConfigFileLoader. - Как работает / API / вызовы / таблицы:
- API:
add_provider(),load(),reload(),get(),section(). RuntimeManager.start()вызываетconfiguration.load().- Поддерживается YAML/JSON через
ConfigFileLoader.parse(). - В БД не пишет.
- API:
- Типовая схема использования:
- В bootstrap добавить файл:
runtime.add_config_file("config.yml"). - Читать секции из
runtime.configuration.section("...")в прикладных фабриках.
- В bootstrap добавить файл:
Logging
- Назначение: применение и восстановление валидной конфигурации логирования.
- Реализация:
LogManager. - Как работает / API / вызовы / таблицы:
- API:
apply_config(config). RuntimeManager.start()вызываетlogs.apply_config(config).- Если секция
logнекорректна, сервис пытается восстановить предыдущую валидную конфигурацию. - В БД не пишет.
- API:
- Типовая схема использования:
- Передать
logсекцию в конфиг и запускать runtime стандартно черезstart().
- Передать
Health
- Назначение: сводный health приложения по воркерам и дополнительным контрибьюторам.
- Реализация:
HealthRegistry+ контрибьюторы по контрактуHealthContributor. - Как работает / API / вызовы / таблицы:
- API:
register(),snapshot(worker_healths),payload(state, worker_healths). - Агрегация:
unhealthyпри критичномunhealthy,degradedпри любой деградации, иначеok. RuntimeManager.current_health()используетHealthRegistry.payload(...).- В БД не пишет.
- API:
- Типовая схема использования:
- Зарегистрировать contributor в
ApplicationModule.register(...). - Отдавать health наружу через control plane
/health.
- Зарегистрировать contributor в
Tracing
- Назначение: фиксация контекстов выполнения и событий по шагам операций.
- Реализация:
TraceService,TraceContextStore,NoOpTraceTransport,MySqlTraceTransport. - Как работает / API / вызовы / таблицы:
- API:
create_context(),open_context(),step(),info()/warning()/error()/exception(),new_root(),child_of(),attach(),resume(). TraceServiceпишет записи через transport; ошибки транспорта изолируются в логах.- При
MySqlTraceTransportзаписи идут в таблицы:trace_contextstrace_messages
- Эталонная схема и migration для MySQL:
requirements/sql/trace_mysql_schema.sql.
- API:
- Trace Context:
- Что это: запись о начале логического контекста выполнения (операция, воркер, подоперация), к которой потом привязываются trace-сообщения.
- Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции.
- Атрибуты контекста (
TraceContextRecord):trace_id,alias,parent_id,type,event_time,attrs. - Иерархия:
parent_idуказывает на родительский контекст; так строится цепочка root -> child. - Стандартная модель для бизнес-приложений:
- root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск;
- child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ;
- child-context должен создаваться через
parent_id=<trace_id root-context>и передаваться дальше черезtrace_context/ runtime metadata как активный trace конкретной бизнес-операции; - если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts.
- Таблица:
trace_contexts. - Как объявляется в коде:
with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine": "orders"}) as trace_id:
...
root = traces.new_root("orders.sync")
child = traces.child_of(root, "orders.process_batch")
with traces.open_context(alias="email:123", kind="email") as message_trace_id:
...
with traces.open_context(
alias="attachment:invoice.xlsx",
parent_id=message_trace_id,
kind="task",
attrs={"attachment_index": 0},
) as task_trace_id:
runtime["trace_id"] = task_trace_id
runtime["message_trace_id"] = message_trace_id
...
- Стандарт для workflow persistence:
- если workflow представляет отдельную бизнес-задачу, в
state.runtime.trace_idдолжен лежать активный trace этой задачи; - дополнительные идентификаторы родительских контекстов (
message_trace_id,email_trace_idи т.д.) могут храниться рядом в runtime для логов и связности, ноworkflow_runs.trace_idдолжен ссылаться именно на активный trace текущей задачи.
- если workflow представляет отдельную бизнес-задачу, в
- Trace Message:
- Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация).
- Роль
step: текущая стадия операции (parse,validate,persistи т.д.), которую выставляют черезtraces.step("..."). - Уровни сообщений:
INFO,WARNING,ERROR;exception(...)пишет сообщение уровняERROR. - Атрибуты сообщения (
TraceLogMessage):trace_id,step,status,message,level,event_time,attrs. - Связь с context: каждое сообщение обязательно связано с текущим активным
trace_id; без активного контекстаTraceServiceвыбрасывает ошибку. - Таблица:
trace_messages. - Как правильно применять в проекте:
- Открывать один root-context на единицу бизнес-работы (например, обработка одного сообщения/заказа).
- Перед важными этапами явно менять
step. infoиспользовать для нормального прогресса,warningдля recoverable ситуаций (retry/fallback),error/exceptionдля сбоев.- В
attrsкласть диагностические ключи (entity_id,attempt,integration,duration_ms), чтобы ускорить расследование инцидентов.
- Типовая схема использования:
- В воркере открыть контекст (
open_context) и на каждом шаге рутины писатьstep()/info().
- В воркере открыть контекст (
Workflow
- Назначение: исполнение step-based workflow с переходами и сохранением прогресса.
- Реализация:
WorkflowRuntimeFactory,WorkflowEngine,WorkflowPersistence,WorkflowRepository,CheckpointRepository. - Как работает / API / вызовы / таблицы:
- API верхнего уровня:
WorkflowRuntimeFactory.create_engine(workflow),WorkflowEngine.run(context). WorkflowEngineпо шагам вызываетWorkflowStep.run(context), применяетTransitionResolver, вызывает hooks.WorkflowPersistenceпишет состояние run/step/checkpoint через репозитории.- При настроенном подключении к БД записи идут в таблицы:
workflow_runsworkflow_stepsworkflow_checkpoints
- Без подключения работает in-memory fallback репозиториев.
- API верхнего уровня:
- Типовая схема использования:
- Описать
WorkflowDefinition(узлы и transitions). - Создать engine через фабрику.
- Запускать из рутины или воркера:
engine.run(context).
- Описать
Control Plane
- Назначение: внешний канал управления runtime и чтения health/status.
- Реализация:
ControlPlaneService,HttpControlChannel,HttpControlAppFactory. - Как работает / API / вызовы / таблицы:
- API:
register_channel(),start(runtime),stop(),snapshot(runtime). - HTTP-канал поднимает endpoint'ы:
GET /healthGET|POST /actions/{action}(start,stop,status)
- Колбэки action'ов вызывают async-методы
RuntimeManager. - В БД не пишет.
- API:
- Типовая схема использования:
- При создании runtime включить
enable_http_control=True. - Использовать
/healthдля readiness/liveness и/actions/*для операционного контроля.
- При создании runtime включить
Простой Web UI через nginx (порт 15000)
- Статические файлы UI:
web/control-ui/(index.html,app.js,styles.css). - UI использует polling (
/api/healthраз в 2 секунды) и кнопкиStart/Stop(POST /api/actions/start|stop). - Для вызовов API UI передает заголовок
X-Client-Source: web-ui. - Пример server-конфига nginx:
deploy/nginx/plba-control-ui.conf.- Слушает
15000. - Отдает UI из
/usr/share/nginx/html. - Проксирует
/api/*на control APIhttp://app:8080/*(sidecar в docker network).
- Слушает
- Пример запуска в составе бизнес-приложения:
deploy/docker-compose.control-ui.yml.- Публикуется только один внешний порт:
15000. - Внутренний control API остается в сети compose и доступен nginx по имени сервиса
app. - В
appнужно поднять control channel на0.0.0.0:8080.
- Публикуется только один внешний порт:
Queue
- Назначение: простой in-memory буфер задач/сообщений внутри приложения.
- Реализация:
InMemoryTaskQueue[T]. - Как работает / API / вызовы / таблицы:
- API:
put(item),get(timeout),task_done(),qsize(),stats(). - Может использоваться между воркерами/сервисами как локальная очередь.
- В БД не пишет.
- API:
- Типовая схема использования:
- Producer в рутине кладет элементы через
put. - Consumer-воркер извлекает через
get(timeout)и обрабатывает.
- Producer в рутине кладет элементы через
5. Application HTTP
PLBA поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от control plane, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
Control Plane и Application HTTP обслуживают разные контуры:
Control Planeиспользуется для/health,/actions/*,/traces/*.Application HTTPиспользуется для бизнес-маршрутов приложения, например/estimate,/estimate/api/tasks,/api/orders.
Основные компоненты
ApplicationHttpServiceуправляет lifecycle прикладного HTTP на уровне runtime.HttpApplicationChannelподнимает отдельныйFastAPIapp черезuvicorn.HttpRouteRegistrarрегистрирует пользовательские routes и получаетServiceContainer.HttpApplicationAppFactoryсобираетFastAPI(title="PLBA Application API"), middleware и routes.UnifiedHttpServiceсобирает одинFastAPIapp с control routes (/health,/actions/*,/traces/*) и application routes.
Минимальный пример
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from plba import ApplicationModule, HttpApplicationChannel, create_runtime
class DemoRoutes:
def register(self, app: FastAPI, services) -> None:
@app.get("/demo")
async def demo_page():
return HTMLResponse("<h1>Demo</h1>")
@app.post("/demo/api/tasks")
async def create_task(file: UploadFile = File(...)):
payload = await file.read()
return {"filename": file.filename, "size": len(payload)}
class DemoModule(ApplicationModule):
@property
def name(self) -> str:
return "demo"
def register(self, registry) -> None:
registry.add_http_routes(DemoRoutes())
runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
runtime.start()
После старта runtime приложение будет обслуживать:
GET /demoPOST /demo/api/tasks
Единый HTTP API
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте UnifiedHttpService как application_http:
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
runtime = RuntimeManager(application_http=UnifiedHttpService())
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
Старый режим с отдельными ControlPlaneService и ApplicationHttpService остаётся доступен и работает как раньше.
Типовые сценарии
Web UI для фоновых задач: список задач, запуск, статус, cancel, download result.Внутренний REST API: напримерPOST /jobs,GET /jobs/{id},POST /jobs/{id}/cancel.Файловый шлюз: загрузка входного файла, асинхронная обработка через worker, скачивание результата.Webhook endpoint: прием callback от внешней системы и передача события в worker pipeline.
Ограничения и рекомендации
- По умолчанию держите business routes и
control planeраздельно; используйтеUnifiedHttpService, когда один опубликованный порт является явным эксплуатационным требованием. - Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
- Runtime предоставляет доступ к зависимостям через
services, поэтому handlers не должны зависеть отRuntimeManager. - В первой версии
Application HTTPне решает auth, static files, шаблонизаторы, websocket и OpenAPI customization. - Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress.
6. MVP бизнес-приложения
Минимальная конфигурация запуска:
- Создать один
ApplicationModule. - В модуле собрать одну
Routineи одинWorker(1 worker -> 1 routine). - Зарегистрировать воркер через
registry.add_worker(...). - Создать runtime:
create_runtime(module, config_path="config.yml"). - Вызвать
runtime.start().
Минимальный пример:
from plba import ApplicationModule, Worker, WorkerHealth, WorkerStatus, create_runtime
from app_runtime.core.registration import ModuleRegistry
class DemoRoutine:
def run(self) -> None:
print("business action")
class DemoWorker(Worker):
def __init__(self, routine: DemoRoutine) -> None:
self._routine = routine
self._started = False
@property
def name(self) -> str:
return "demo-worker"
@property
def critical(self) -> bool:
return True
def start(self) -> None:
self._started = True
self._routine.run()
def stop(self, force: bool = False) -> None:
del force
self._started = False
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=self.critical)
def status(self) -> WorkerStatus:
return WorkerStatus(name=self.name, state="idle" if self._started else "stopped")
class DemoModule(ApplicationModule):
@property
def name(self) -> str:
return "demo"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(DemoWorker(DemoRoutine()))
runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.start()
Для production-сценария после MVP обычно добавляют tracing, health contributors, workflow, HTTP control plane и при необходимости application HTTP, но базовый запуск не требует этих расширений.