diff --git a/README.md b/README.md index 99d6c8d..f3b5dbf 100644 --- a/README.md +++ b/README.md @@ -1,481 +1,206 @@ # PLBA -## 1. Общее описание сервиса и его назначение +`PLBA` (`Platform Runtime for Business Applications`) - runtime для бизнес-приложений. -`PLBA` (`Platform Runtime for Business Applications`) - это платформенный runtime для бизнес-приложений. Библиотека выносит из прикладного кода типовые инфраструктурные задачи: жизненный цикл приложения, запуск и остановку фоновых воркеров, загрузку конфигурации, health-check, tracing, логирование и control plane. +Платформа берет на себя инфраструктурные обязанности: +- lifecycle приложения +- запуск и остановку воркеров +- health/status +- tracing +- logging +- control plane +- загрузку конфигурации -Назначение сервиса: -- дать единый каркас для запуска бизнес-модулей; -- отделить платформенные обязанности от предметной логики; -- упростить разработку сервисов с очередями, polling-обработчиками и фоновыми процессами; -- обеспечить наблюдаемость и управляемость runtime через health/status и HTTP control endpoints. +Бизнес-приложение на базе `plba` собирается вокруг трех уровней: +- `ApplicationModule` собирает приложение и регистрирует воркеры +- `Worker` управляет исполнением и lifecycle +- `Routine` реализует бизнес-функцию -Текущая модель работы выглядит так: -1. приложение создаёт `RuntimeManager`; -2. runtime загружает конфигурацию; -3. применяется logging-конфигурация; -4. модуль приложения регистрирует очереди, обработчики, воркеры и health contributors; -5. `WorkerSupervisor` запускает все рабочие компоненты; -6. runtime агрегирует состояние, health и tracing; -7. control plane предоставляет снимок состояния и команды запуска/остановки. +`Routine` не является контрактом `plba`. Это рекомендуемый архитектурный паттерн для прикладного кода. -`PLBA` особенно полезен для: -- почтовых ботов и интеграционных сервисов; -- event-driven и queue-driven приложений; -- фоновых бизнес-процессов; -- внутренних платформенных сервисов с единым operational-контуром. +Правила построения приложений на платформе собраны отдельно в [application_guidelines.md](/Users/alex/Dev_projects_v2/apps/plba/requirements/application_guidelines.md). -## 2. Архитектура - диаграмма классов +## Runtime model -```mermaid -classDiagram - class RuntimeManager { - +register_module(module) - +add_config_file(path) - +start() - +stop(timeout, force, stop_control_plane) - +status() - +current_health() - } +1. приложение объявляет `ApplicationModule` +2. модуль регистрирует один или несколько `Worker` +3. `RuntimeManager` запускает все воркеры +4. каждый `Worker` запускает свою бизнес-активность +5. runtime агрегирует health и status +6. runtime останавливает воркеры graceful или forcefully - class ConfigurationManager { - +add_provider(provider) - +load() - +reload() - +get() - +section(name, default) - } +## Main contracts - class ServiceContainer { - +register(name, service) - +get(name) - +require(name, expected_type) - +snapshot() - } +### `ApplicationModule` - class ModuleRegistry { - +register_module(name) - +add_queue(name, queue) - +add_handler(name, handler) - +add_worker(worker) - +add_health_contributor(contributor) - } +Описывает, из чего состоит приложение. - class ApplicationModule { - <> - +name - +register(registry) - } +Ответственность: +- дать имя модуля +- зарегистрировать воркеры +- зарегистрировать health contributors при необходимости +- собрать прикладные зависимости - class WorkerSupervisor { - +register(worker) - +start() - +stop(timeout, force) - +snapshot() - +healths() - +statuses() - } +### `Worker` - class Worker { - <> - +start() - +stop(force) - +health() - +status() - } +Главный runtime-контракт платформы. - class QueueWorker { - +name - +critical - +start() - +stop(force) - +health() - +status() - } +Контракт: +- `name` +- `critical` +- `start()` +- `stop(force=False)` +- `health()` +- `status()` - class TaskQueue { - <> - +publish(task) - +consume(timeout) - +ack(task) - +nack(task, retry_delay) - +stats() - } +`Worker` отвечает только за runtime-поведение: +- как запускается бизнес-активность +- в одном потоке или нескольких +- single-run или loop +- graceful shutdown +- интерпретацию ошибок в `health/status` - class InMemoryTaskQueue { - +publish(task) - +consume(timeout) - +ack(task) - +nack(task, retry_delay) - +stats() - } +### `Routine` - class TaskHandler { - <> - +handle(task) - } +Рекомендуемый application-level паттерн. - class TraceService { - +create_context(...) - +open_context(...) - +resume(task_metadata, operation) - +attach(task_metadata, context) - +info(message, status, attrs) - +warning(message, status, attrs) - +error(message, status, attrs) - } +`Routine` описывает бизнес-функцию: +- что читать +- какие сервисы вызывать +- какие бизнес-решения принимать +- что сохранять или отправлять наружу - class HealthRegistry { - +register(contributor) - +snapshot(worker_healths) - +payload(state, worker_healths) - } +Обычно воркер получает одну routine через конструктор и вызывает ее в `start()` или во внутренних helper-методах. - class ControlPlaneService { - +register_channel(channel) - +start(runtime) - +stop() - +snapshot(runtime) - } - - class LogManager { - +apply_config(config) - } - - class FileConfigProvider { - +load() - } - - RuntimeManager --> ConfigurationManager - RuntimeManager --> ServiceContainer - RuntimeManager --> ModuleRegistry - RuntimeManager --> WorkerSupervisor - RuntimeManager --> TraceService - RuntimeManager --> HealthRegistry - RuntimeManager --> ControlPlaneService - RuntimeManager --> LogManager - RuntimeManager --> ApplicationModule - ModuleRegistry --> ServiceContainer - ModuleRegistry --> Worker - ModuleRegistry --> TaskQueue - ModuleRegistry --> TaskHandler - WorkerSupervisor --> Worker - QueueWorker --|> Worker - QueueWorker --> TaskQueue - QueueWorker --> TaskHandler - QueueWorker --> TraceService - InMemoryTaskQueue --|> TaskQueue - ConfigurationManager --> FileConfigProvider -``` - -### Архитектурные слои - -- `app_runtime.core` - оркестрация runtime, контейнер сервисов, регистрация модулей, типы состояния. -- `app_runtime.contracts` - абстракции для интеграции бизнес-приложений. -- `app_runtime.workers`, `queue`, `config`, `logging`, `health`, `tracing`, `control` - инфраструктурные адаптеры и платформенные сервисы. -- `plba` - публичный фасад, который реэкспортирует ключевые классы как API пакета. - -## 3. Описание доступных модулей, их назначение, краткое устройство, примеры применения в бизнес приложениях - -### `plba` - -Публичный API пакета. Реэкспортирует `RuntimeManager`, `ApplicationModule`, `QueueWorker`, `InMemoryTaskQueue`, `TraceService`, `HealthRegistry`, `ControlPlaneService` и другие классы. - -Краткое устройство: -- служит фасадом над `app_runtime`; -- упрощает импорт для прикладного кода; -- позволяет использовать пакет как библиотеку без знания внутренней структуры. - -Пример применения: -- бизнес-сервис импортирует `create_runtime` и `ApplicationModule`, собирает свой модуль и запускает runtime. - -### `app_runtime.core` - -Основной orchestration-слой. - -Ключевые классы: -- `RuntimeManager` - центральная точка запуска и остановки; -- `ConfigurationManager` - загрузка и merge конфигурации; -- `ServiceContainer` - DI-like контейнер платформенных сервисов; -- `ModuleRegistry` - регистрация очередей, обработчиков, воркеров и health contributors. - -Краткое устройство: -- `RuntimeManager` создаёт и связывает инфраструктурные сервисы; -- при старте регистрирует health contributors, воркеры и поднимает control plane; -- `ModuleRegistry` связывает бизнес-модуль с runtime без жёсткой зависимости на конкретные реализации. - -Примеры применения в бизнес-приложениях: -- CRM-интеграция с несколькими фоновых воркерами; -- сервис обработки заявок, где один модуль регистрирует очередь, handler и worker pool; -- back-office процесс с управляемым graceful shutdown. - -### `app_runtime.contracts` - -Набор абстракций для расширения платформы. - -Ключевые контракты: -- `ApplicationModule`; -- `Worker`; -- `TaskQueue`; -- `TaskHandler`; -- `ConfigProvider`; -- `HealthContributor`; -- trace-related контракты. - -Краткое устройство: -- бизнес-код реализует интерфейсы, а runtime работает только через контракты; -- это позволяет менять инфраструктуру без переписывания прикладной логики. - -Примеры применения в бизнес-приложениях: -- реализовать свой `ApplicationModule` для почтового бота; -- подключить собственный `ConfigProvider` для БД или secrets storage; -- реализовать кастомный `Worker` для long-running polling процесса. - -### `app_runtime.workers` - -Модуль управления рабочими процессами. - -Ключевые классы: -- `WorkerSupervisor` - запускает и останавливает набор воркеров; -- `QueueWorker` - стандартный worker для обработки задач из очереди. - -Краткое устройство: -- `WorkerSupervisor` агрегирует health/status всех воркеров; -- `QueueWorker` поднимает нужное число потоков, читает задачи из `TaskQueue`, вызывает `TaskHandler`, делает `ack/nack` и обновляет operational-метрики. - -Примеры применения в бизнес-приложениях: -- параллельная обработка входящих писем; -- обработка очереди заказов; -- фоновая генерация документов или актов. - -### `app_runtime.queue` - -Очереди задач. - -Ключевой класс: -- `InMemoryTaskQueue`. - -Краткое устройство: -- использует стандартный `Queue` из Python; -- хранит счётчики `published`, `acked`, `nacked`, `queued`; -- подходит как базовая реализация для разработки, тестов и простых сценариев. - -Примеры применения в бизнес-приложениях: -- локальная очередь задач в небольшом внутреннем сервисе; -- тестовая среда без внешнего брокера; -- staging-сценарии для отладки worker pipeline. - -### `app_runtime.config` - -Подсистема загрузки конфигурации. - -Ключевые классы: -- `FileConfigProvider`; -- `ConfigFileLoader`. - -Краткое устройство: -- `ConfigurationManager` собирает данные из провайдеров; -- текущая штатная реализация читает YAML-файл; -- поддерживается глубокое слияние секций конфигурации. - -Примеры применения в бизнес-приложениях: -- конфигурация платформы и прикладных модулей из `config.yml`; -- раздельное хранение `platform`, `log` и app-specific секций; -- подключение нескольких источников конфигурации с последующим merge. - -### `app_runtime.logging` - -Управление логированием. - -Ключевой класс: -- `LogManager`. - -Краткое устройство: -- применяет `dictConfig` из секции `log`; -- хранит последнюю валидную конфигурацию и пытается восстановиться при ошибке. - -Примеры применения в бизнес-приложениях: -- единообразная настройка JSON-логов; -- переключение уровней логирования между окружениями; -- централизованная logging-конфигурация для нескольких модулей. - -### `app_runtime.health` - -Подсистема health aggregation. - -Ключевой класс: -- `HealthRegistry`. - -Краткое устройство: -- собирает health от воркеров и дополнительных contributors; -- агрегирует статус в `ok`, `degraded`, `unhealthy`; -- формирует payload для readiness/liveness и operational snapshot. - -Примеры применения в бизнес-приложениях: -- показывать degraded, если обработка идёт с ошибками; -- маркировать сервис unhealthy при падении критичного worker; -- добавлять health внешней зависимости, например IMAP или ERP API. - -### `app_runtime.tracing` - -Подсистема трассировки выполнения. - -Ключевые классы: -- `TraceService`; -- `TraceContextStore`; -- `NoOpTraceTransport`. - -Краткое устройство: -- создаёт trace contexts; -- связывает source/queue/worker/handler через metadata; -- пишет контексты и сообщения через транспортный слой. - -Примеры применения в бизнес-приложениях: -- трассировка обработки письма от polling до бизнес-handler; -- аудит прохождения заказа по pipeline; -- отладка проблемных задач в фоне. - -### `app_runtime.control` - -Control plane и HTTP-канал управления. - -Ключевые классы: -- `ControlPlaneService`; -- `HttpControlChannel`; -- `ControlActionSet`. - -Краткое устройство: -- публикует health/status и команды управления runtime; -- может поднимать HTTP endpoints для start/stop/status; -- строит snapshot состояния на основе `RuntimeManager`. - -Примеры применения в бизнес-приложениях: -- административный endpoint для оператора; -- health endpoint для Kubernetes/nomad; -- runtime status для monitoring dashboard. - -## 4. Установка - `git@git.lesha.spb.ru:alex/plba.git` - -### Требования - -- Python `3.12+` -- `pip` -- SSH-доступ к `git.lesha.spb.ru` - -### Установка напрямую через `pip` из Git-репозитория - -```bash -pip install "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git" -``` - -При такой установке `pip` ставит не только сам пакет `plba`, но и все его зависимости, объявленные в [pyproject.toml](/Users/alex/Dev_projects_v2/apps/plba/pyproject.toml), например `fastapi`, `uvicorn` и `PyYAML`. - -Если нужна установка из конкретной ветки: - -```bash -pip install "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git@main" -``` - -Если нужна установка из конкретного тега или commit hash: - -```bash -pip install "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git@v0.1.0" -``` - -или - -```bash -pip install "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git@" -``` - -### Установка в виртуальное окружение - -```bash -python -m venv .venv -source .venv/bin/activate -pip install --upgrade pip -pip install "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git" -``` - -### Подключение `plba` в бизнес-приложении - -Чтобы при установке бизнес-приложения автоматически подтягивались зависимости `plba`, нужно добавить `plba` в зависимости самого бизнес-приложения как Git dependency. - -Пример для `requirements.txt`: - -```txt -plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git -``` - -Пример для `pyproject.toml`: - -```toml -[project] -dependencies = [ - "plba @ git+ssh://git@git.lesha.spb.ru/alex/plba.git", -] -``` - -Если бизнес-приложение собирается в Docker, достаточно чтобы на этапе сборки выполнялся обычный `pip install`, например: - -```dockerfile -COPY pyproject.toml . -RUN pip install . -``` - -или при использовании `requirements.txt`: - -```dockerfile -COPY requirements.txt . -RUN pip install -r requirements.txt -``` - -В обоих случаях `pip` установит `plba` из Git и автоматически подтянет его транзитивные зависимости. - -### Локальная разработка - -Если пакет нужно не только использовать, но и разрабатывать: - -```bash -git clone git@git.lesha.spb.ru:alex/plba.git -cd plba -python -m venv .venv -source .venv/bin/activate -pip install -e . -``` - -### Быстрая проверка - -```bash -python -c "import plba; print(plba.__all__[:5])" -``` - -### Минимальный пример использования +## Minimal example ```python -from plba import ApplicationModule, InMemoryTaskQueue, QueueWorker, RuntimeManager, Task, TaskHandler +from threading import Event, Lock, Thread +from time import sleep + +from plba import ( + ApplicationModule, + Worker, + WorkerHealth, + WorkerStatus, + create_runtime, +) -class PrintHandler(TaskHandler): - def handle(self, task: Task) -> None: - print(task.payload) +class OrdersRoutine: + def __init__(self, service) -> None: + self._service = service + + def run(self) -> None: + self._service.process_new_orders() -class DemoModule(ApplicationModule): +class OrdersWorker(Worker): + def __init__(self, routine: OrdersRoutine, interval: float = 1.0) -> None: + self._routine = routine + self._interval = interval + self._thread: Thread | None = None + self._stop_requested = Event() + self._lock = Lock() + self._in_flight = 0 + self._failures = 0 + @property def name(self) -> str: - return "demo" + return "orders-worker" + + @property + def critical(self) -> bool: + return True + + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._stop_requested.clear() + self._thread = Thread(target=self._run_loop, daemon=True) + self._thread.start() + + def stop(self, force: bool = False) -> None: + del force + self._stop_requested.set() + + def health(self) -> WorkerHealth: + if self._failures > 0: + return WorkerHealth(self.name, "degraded", self.critical) + return WorkerHealth(self.name, "ok", self.critical) + + def status(self) -> WorkerStatus: + alive = self._thread is not None and self._thread.is_alive() + state = "busy" if self._in_flight else "idle" + if not alive: + state = "stopped" + elif self._stop_requested.is_set(): + state = "stopping" + return WorkerStatus(name=self.name, state=state, in_flight=self._in_flight) + + def _run_loop(self) -> None: + while not self._stop_requested.is_set(): + with self._lock: + self._in_flight += 1 + try: + self._routine.run() + except Exception: + self._failures += 1 + finally: + with self._lock: + self._in_flight -= 1 + sleep(self._interval) + + +class OrdersModule(ApplicationModule): + @property + def name(self) -> str: + return "orders" def register(self, registry) -> None: - queue = InMemoryTaskQueue() - traces = registry.services.get("traces") - handler = PrintHandler() - queue.publish(Task(name="demo-task", payload={"id": 1}, metadata={})) - registry.add_worker(QueueWorker("demo-worker", queue, handler, traces)) + service = OrderService() + routine = OrdersRoutine(service) + registry.add_worker(OrdersWorker(routine)) -runtime = RuntimeManager() -runtime.register_module(DemoModule()) +runtime = create_runtime(OrdersModule(), config_path="config.yml") runtime.start() -runtime.stop() ``` + +## Health and status + +Практика такая: +- `Routine` выполняет бизнес-работу +- `Worker` ловит ее ошибки +- `Worker` интерпретирует outcome в `health()` и `status()` + +То есть routine не выставляет health напрямую. + +## In-memory queue + +`InMemoryTaskQueue` остается в платформе как простой in-memory utility. + +Это не базовый платформенный контракт и не обязательный паттерн архитектуры. +Ее можно использовать в прикладном коде как локальный буфер между компонентами, если это действительно помогает. + +```python +from plba import InMemoryTaskQueue + +queue = InMemoryTaskQueue[str]() +queue.put("payload") +item = queue.get(timeout=0.1) +``` + +## Public API + +Основные публичные сущности: +- `ApplicationModule` +- `Worker` +- `WorkerHealth` +- `WorkerStatus` +- `RuntimeManager` +- `WorkerSupervisor` +- `TraceService` +- `HealthRegistry` +- `InMemoryTaskQueue` +- `create_runtime(...)` diff --git a/requirements/Architectural constraints.md b/requirements/Architectural constraints.md index 9f40030..8fd467e 100644 --- a/requirements/Architectural constraints.md +++ b/requirements/Architectural constraints.md @@ -1,111 +1,50 @@ +# Architectural Constraints -**`docs/adr/0001-new-runtime.md`** -```md -# ADR 0001: Create a new runtime project instead of evolving the legacy ConfigManager model +## Main constraints -## Status +1. `Worker` is the primary runtime abstraction. +2. `ApplicationModule` assembles the application and registers workers. +3. Business behavior should live outside worker lifecycle code. +4. The platform should avoid queue-centric abstractions as first-class architecture. +5. Utility components are allowed only when they remain optional and transparent. -Accepted +## Worker constraints -## Context +Worker should own: +- thread/process lifecycle +- execution strategy +- graceful stop +- runtime state +- health state -The previous generation of the application runtime was centered around a timer-driven execution model: -- a manager loaded configuration -- a worker loop periodically called `execute()` -- applications placed most operational logic behind that entry point +Worker should not own: +- large business workflows +- domain rules +- parsing plus persistence plus integration logic in one class -That model is adequate for simple periodic jobs, but it does not match the direction of the new platform. +## Business-code constraints -The new platform must support: -- task sources -- queue-driven processing -- multiple parallel workers -- trace propagation across producer/consumer boundaries -- richer lifecycle and status management -- health aggregation -- future admin web interface -- future authentication and user management +Business routines and services should own: +- business decisions +- external integration calls +- persistence +- domain validation -These are platform concerns, not business concerns. +They should not own: +- platform lifecycle +- worker supervision +- runtime status management -We also want business applications to describe only business functionality and rely on the runtime for infrastructure behavior. +## API constraints -## Decision +Public API should stay small and clear. -We will create a new runtime project instead of implementing a `V3` directly inside the current legacy ConfigManager codebase. - -The new runtime will be built around a platform-oriented model with explicit concepts such as: -- `RuntimeManager` +Prefer: - `ApplicationModule` -- `TaskSource` -- `TaskQueue` -- `WorkerSupervisor` -- `TaskHandler` -- `TraceService` -- `HealthRegistry` +- `Worker` +- runtime services +- utility queue -The old execute-centered model is treated as a previous-generation design and is not the architectural basis of the new runtime. - -## Rationale - -Creating a new runtime project gives us: -- freedom to design the correct abstractions from the start -- no pressure to preserve legacy contracts -- cleaner boundaries between platform and business logic -- simpler documentation and tests -- lower long-term complexity than mixing old and new models in one codebase - -If we built this as `V3` inside the old project, we would likely inherit: -- compatibility constraints -- mixed abstractions -- transitional adapters -- conceptual confusion between periodic execution and event/queue processing - -The expected long-term cost of such coupling is higher than creating a clean new runtime. - -## Consequences - -### Positive - -- the platform can be modeled cleanly -- business applications can integrate through explicit contracts -- new runtime capabilities can be added without legacy pressure -- mail_order_bot can become the first pilot application on the new runtime - -### Negative - -- some existing capabilities will need to be reintroduced in the new project -- there will be a temporary period with both legacy and new runtime lines -- migration requires explicit planning - -## Initial migration target - -The first application on the new runtime will be `mail_order_bot`. - -Initial runtime design for that application: -- IMAP polling source -- in-memory queue -- parallel workers -- business handler for email processing -- message marked as read only after successful processing - -Later: -- swap IMAP polling source for IMAP IDLE source -- keep the queue and worker model unchanged - -## What we intentionally do not carry over - -We do not keep the old architecture as the central organizing principle: -- no `execute()`-centric application model -- no timer-loop as the main abstraction -- no implicit mixing of lifecycle and business processing - -## Follow-up - -Next design work should define: -- core platform contracts -- package structure -- runtime lifecycle -- queue and worker interfaces -- config model split between platform and application -- pilot integration for `mail_order_bot` +Avoid: +- legacy abstractions preserved only for compatibility +- specialized platform roles without strong need diff --git a/requirements/Mail Order Bot Migration Plan.md b/requirements/Mail Order Bot Migration Plan.md index c613f16..cd0e386 100644 --- a/requirements/Mail Order Bot Migration Plan.md +++ b/requirements/Mail Order Bot Migration Plan.md @@ -1,116 +1,62 @@ # Mail Order Bot Migration Plan -## Purpose +## Goal -This document describes how `mail_order_bot` will be adapted to the new runtime as the first pilot business application. +Migrate `mail_order_bot` to the new PLBA model: +- application assembled by `ApplicationModule` +- runtime execution owned by `Worker` +- business behavior implemented in routines/services -## Scope +## Target structure -This is not a full migration specification for all future applications. -It is a practical first use case to validate the runtime architecture. +### Application module -## Current model - -The current application flow is tightly coupled: -- the manager checks IMAP -- unread emails are fetched -- emails are processed synchronously -- messages are marked as read after processing - -Polling and processing happen in one execution path. - -## Target model - -The new runtime-based flow should be: - -1. a mail source detects new tasks -2. tasks are published to a queue -3. workers consume tasks in parallel -4. a domain handler processes each email -5. successful tasks lead to `mark_as_read` -6. failed tasks remain retriable - -## Phase 1 - -### Source -- IMAP polling source - -### Queue -- in-memory task queue +`MailOrderBotModule` should: +- build domain services +- build routines +- register workers +- register optional health contributors ### Workers -- 2 to 4 parallel workers initially -### Handler -- domain email processing handler built around the current processing logic +Initial workers: +- `MailPollingWorker` +- `EmailProcessingWorker` +- optional `ReconciliationWorker` -### Delivery semantics -- email is marked as read only after successful processing -- unread state acts as the first safety mechanism against message loss +Each worker should own only runtime behavior: +- start/stop +- execution loop +- thread model +- health/status -## Why in-memory queue is acceptable at first +### Routines -For the first phase: -- infrastructure complexity stays low -- the runtime contracts can be tested quickly -- unread emails in IMAP provide a simple recovery path after crashes +Initial routines: +- `MailPollingRoutine` +- `EmailProcessingRoutine` +- `ReconciliationRoutine` -This allows us to validate the runtime architecture before adopting an external broker. +Each routine should own business behavior: +- fetch messages +- parse message payload +- call domain services +- persist changes +- report business failures upward -## Phase 2 +## Migration steps -Replace: -- IMAP polling source +1. Extract business logic from current worker-like components into routines/services. +2. Implement thin workers that call those routines. +3. Register workers from `MailOrderBotModule`. +4. Use runtime health and status only through worker state. +5. Keep any local queue only as an implementation detail if it still helps. -With: -- IMAP IDLE source +## Optional queue usage -The queue, workers, and handler should remain unchanged. +If email processing still benefits from buffering, `InMemoryTaskQueue` may be used inside the application. -This is an explicit architectural goal: -source replacement without redesigning the processing pipeline. - -## Domain responsibilities that remain inside mail_order_bot - -The runtime should not own: -- email parsing rules -- client resolution logic -- attachment processing rules -- order creation logic -- client-specific behavior - -These remain in the business application. - -## Platform responsibilities used by mail_order_bot - -The new runtime should provide: -- lifecycle -- configuration -- queue abstraction -- worker orchestration -- tracing -- health checks -- status/control APIs - -## Migration boundaries - -### Move into runtime -- source orchestration -- worker supervision -- queue management -- trace provisioning -- health aggregation - -### Keep in mail_order_bot -- email business handler -- mail domain services -- business pipeline -- business-specific config validation beyond platform-level schema - -## Success criteria - -The migration is successful when: -- mail polling is no longer tightly coupled to processing -- workers can process emails in parallel -- business logic is not moved into the runtime -- replacing polling with IDLE is localized to the source layer +Important: +- it should remain an app-level detail +- it should not define the platform contract +- the main architecture should still be described through workers and routines diff --git a/requirements/README.md b/requirements/README.md index 5a6c2a7..28353b0 100644 --- a/requirements/README.md +++ b/requirements/README.md @@ -1,469 +1,134 @@ -# PLBA +# PLBA Requirements -`PLBA` is a reusable platform runtime for business applications. +## Goal -It solves platform concerns that should not live inside domain code: -- application lifecycle +`PLBA` is a reusable runtime for business applications. + +The platform owns: +- lifecycle - worker orchestration -- configuration loading from YAML -- tracing -- health aggregation -- runtime status reporting -- HTTP control endpoints +- configuration loading - logging configuration +- health aggregation +- tracing +- control endpoints -Business applications depend on `plba` as a package and implement only their own business behavior. +Business applications own: +- business workflows +- domain services +- app-specific configuration schema +- business error semantics -## Architecture +## Core runtime model -Current PLBA architecture is built around one core idea: -- the runtime manages a set of application workers +The platform is built around workers. -A worker is any runtime-managed active component with a unified lifecycle: +1. application defines an `ApplicationModule` +2. module registers workers +3. runtime starts workers +4. workers execute business activity +5. runtime aggregates status and health +6. runtime stops workers gracefully + +## Contracts + +### `ApplicationModule` + +Responsibilities: +- provide module name +- assemble application components +- register workers +- register optional health contributors + +### `Worker` + +Main runtime contract. + +Responsibilities: - `start()` - `stop(force=False)` - `health()` - `status()` -This means PLBA does not require separate platform categories like `source` and `consumer`. -If an application needs polling, queue processing, listening, scheduled work, or another active loop, it is implemented as a worker. +The worker owns execution mechanics: +- single-run or loop +- thread model +- stop conditions +- runtime status +- health interpretation -### Main runtime model +### `Routine` -1. application creates `RuntimeManager` -2. runtime loads configuration -3. runtime applies logging configuration -4. application module registers workers and supporting services -5. runtime starts all workers -6. workers execute business-related loops or processing -7. runtime aggregates health and status -8. runtime stops workers gracefully or forcefully - -## Core concepts - -### `ApplicationModule` - -File: [application.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/application.py) - -Describes a business application to the runtime. +`Routine` is an application pattern, not a PLBA contract. Responsibilities: -- provide module name -- register workers -- register queues if needed -- register handlers if needed -- register health contributors -- compose application-specific objects +- execute business behavior +- call domain services +- apply business rules +- talk to external integrations -`ApplicationModule` does not run the application itself. -It only declares how the application is assembled. +Recommended rule: +- worker orchestrates +- routine executes business behavior -### `Worker` +## Architectural rules -File: [worker.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/worker.py) - -The main runtime-managed contract. - -Responsibilities: -- start its own execution -- stop gracefully or forcefully -- report health -- report runtime status - -This is the main extension point for business applications. - -### `TaskQueue` - -File: [queue.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/queue.py) - -Optional queue abstraction. - -Use it when application workers need buffered or decoupled processing. - -PLBA does not force every application to use a queue. -Queue is one supported pattern, not the foundation of the whole platform. - -### `TaskHandler` - -File: [tasks.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/tasks.py) - -Optional unit of business processing for one task. - -Useful when a worker follows queue-driven logic: -- worker takes a task -- handler executes business logic - -### `TraceService` - -File: [service.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/tracing/service.py) - -Platform trace service. - -Responsibilities: -- create trace contexts -- resume trace from task metadata -- write context records -- write trace messages - -Business code should use it as a platform service and should not implement its own tracing infrastructure. - -### `HealthRegistry` - -File: [registry.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/health/registry.py) - -Aggregates application health. - -PLBA uses three health states: -- `ok` — all critical parts work -- `degraded` — application still works, but there is a problem -- `unhealthy` — application should not be considered operational - -### Runtime status - -File: [types.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/core/types.py) - -Status is separate from health. - -Current runtime states: -- `starting` -- `idle` -- `busy` -- `stopping` -- `stopped` - -Status is used for operational lifecycle decisions such as graceful shutdown. - -### `ControlPlaneService` - -Files: -- [service.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/control/service.py) -- [http_channel.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/control/http_channel.py) - -Provides control and observability endpoints. - -Currently supported: -- health access -- runtime start action -- runtime stop action -- runtime status action - -### `ConfigurationManager` - -Files: -- [configuration.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/core/configuration.py) -- [file_loader.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/config/file_loader.py) -- [providers.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/config/providers.py) - -Loads and merges configuration. - -Current built-in source: -- YAML file provider - -### `LogManager` - -File: [manager.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/logging/manager.py) - -Applies logging configuration from config. - -Current expectation: -- logging config lives in the `log` section of YAML - -## Available platform services - -PLBA currently provides these reusable services. - -### 1. Runtime lifecycle - -Service: -- `RuntimeManager` - -What it gives: -- startup orchestration -- worker registration and startup -- graceful stop with timeout -- force stop -- status snapshot - -Example use: -- start `mail_order_bot` -- stop it after active email processing is drained - -### 2. Worker supervision - -Service: -- `WorkerSupervisor` - -What it gives: -- unified worker orchestration -- aggregated worker statuses -- aggregated worker health -- stop coordination - -Example use: -- run one polling worker and three processing workers in the same application - -### 3. Queue support - -Services: -- `TaskQueue` -- `InMemoryTaskQueue` -- `QueueWorker` - -What it gives: -- buffered processing -- decoupling between task production and task consumption -- worker concurrency for task handling - -Example use: -- worker A polls IMAP and pushes tasks to queue -- worker B processes queued email tasks with concurrency `3` - -### 4. Configuration - -Services: -- `ConfigurationManager` -- `FileConfigProvider` -- `ConfigFileLoader` - -What it gives: -- YAML config loading -- config merging -- access to platform and application config - -Example use: -- load `platform` section for runtime -- load `mail_order_bot` section for app-specific config - -### 5. Tracing - -Services: -- `TraceService` -- `TraceTransport` -- `NoOpTraceTransport` - -What it gives: -- trace context creation -- trace propagation through task metadata -- trace messages for processing steps - -Example use: -- polling worker creates trace when it discovers a mail -- processing worker resumes trace and writes business steps - -### 6. Health - -Services: -- `HealthRegistry` -- `WorkerHealth` - -What it gives: -- per-worker health -- aggregated application health -- critical vs non-critical component handling - -Example use: -- email processing workers are critical -- optional diagnostic worker may be non-critical - -### 7. Status - -Services: -- `WorkerStatus` -- runtime aggregated state - -What it gives: -- current activity visibility -- ability to stop application only after in-flight work is completed - -Example use: -- stop application only after processing workers become `idle` or `stopped` - -### 8. HTTP control - -Services: -- `ControlPlaneService` -- `HttpControlChannel` - -What it gives: -- HTTP health/status/actions -- operational integration point - -Example use: -- inspect current health from orchestration -- request graceful stop remotely - -## Public package API - -Public namespace is `plba`. - -Main imports for external applications: - -```python -from plba import ApplicationModule, QueueWorker, RuntimeManager, create_runtime -from plba.contracts import Task, TaskHandler, TaskQueue, Worker, WorkerHealth, WorkerStatus -from plba.queue import InMemoryTaskQueue -from plba.tracing import TraceService -``` - -## Example application pattern - -Minimal queue-based application: - -```python -from plba import ApplicationModule, QueueWorker, Task, TaskHandler, create_runtime -from plba.queue import InMemoryTaskQueue - - -class ExampleHandler(TaskHandler): - def handle(self, task: Task) -> None: - print(task.payload) - - -class ExampleModule(ApplicationModule): - @property - def name(self) -> str: - return "example" - - def register(self, registry) -> None: - queue = InMemoryTaskQueue() - traces = registry.services.get("traces") - - queue.publish(Task(name="incoming", payload={"hello": "world"})) - - registry.add_queue("incoming", queue) - registry.add_worker(QueueWorker("example-worker", queue, ExampleHandler(), traces)) - - -runtime = create_runtime( - ExampleModule(), - config_path="config.yml", - enable_http_control=False, -) -runtime.start() -``` - -## Building business applications on PLBA - -These are the current rules for building business applications correctly. - -### 1. Keep platform and business concerns separate - -PLBA owns: -- lifecycle -- worker management -- logging -- trace infrastructure -- health aggregation -- HTTP control -- config loading - -Business application owns: -- business workflows -- domain services -- application-specific config schema -- business task payloads -- business error semantics - -### 2. Build app behavior from workers - -A business application should be described as a small set of workers. - -Typical examples: -- polling worker -- processing worker -- reconciliation worker - -Do not introduce new worker types at platform level unless there is clear need for custom runtime behavior. - -### 3. Use queues only when they help - -Queue is optional. - -Use queue when: -- one worker discovers work -- another worker processes it -- buffering or decoupling helps -- concurrency is needed - -Do not force queue into applications that do not need it. - -### 4. Keep business logic out of worker lifecycle code - -Worker should orchestrate execution. -Business rules should live in dedicated services and handlers. +### 1. Keep lifecycle and business behavior separate Good: -- worker gets config -- worker calls domain service -- worker reports trace and status +- worker manages threading, loop, stop flags, health, status +- routine contains business logic Bad: -- worker contains all parsing, decision logic, integration rules, and persistence rules in one class +- worker mixes thread management, retry policy, parsing, persistence, integration rules, and domain decisions in one class -### 5. Use trace as a platform service +### 2. Treat `Worker` as the main extension point -Business application should: -- create meaningful trace steps -- propagate trace through task metadata if queue is used -- record business-relevant processing milestones +Do not center the platform around queues, handlers, sources, or other specialized runtime categories. -Business application should not: -- implement its own trace store -- control trace transport directly unless explicitly needed +### 3. Keep `Routine` out of the platform contract set -### 6. Read config through PLBA +At the current stage PLBA should not force a universal `Routine` interface. -Business application should not read YAML directly. +Applications may use: +- `run()` +- `run_once()` +- `poll()` +- `sync_window()` -Recommended flow: -- PLBA loads config -- application reads only its own config section -- application converts it to typed app config object -- services receive typed config object +The exact business API belongs to the application. -### 7. Distinguish health from status +### 4. Health is computed by the worker -Use `health` for: -- is application operational? +Routine should not directly mutate platform health state. -Use `status` for: -- what is application doing right now? +Instead: +- routine succeeds +- routine returns outcome +- or routine raises typed exceptions -This is important for graceful stop: -- health may still be `ok` -- status may be `busy` +Worker interprets the outcome into: +- `ok` +- `degraded` +- `unhealthy` -### 8. Design workers for graceful stop +### 5. `InMemoryTaskQueue` is utility-only -Workers should support: -- stop accepting new work -- finish current in-flight work when possible -- report `busy`, `idle`, `stopping`, `stopped` +`InMemoryTaskQueue` may stay as a reusable component for business applications. -This allows runtime to stop application safely. +It is: +- optional +- local +- not part of the main runtime contract model -## Recommended repository model +## Public package direction -PLBA is intended to live in its own repository as a reusable package. +Public namespace `plba` should expose: +- application/runtime contracts +- tracing +- health +- config +- control plane +- utility queue -Recommended setup: -- repository `plba`: platform package only -- repository `mail_order_bot`: business application depending on `plba` -- repository `service_b`: business application depending on `plba` - -## Example: `mail_order_bot` - -Simple first version of `mail_order_bot` on PLBA: -- `MailPollingWorker`, concurrency `1` -- `EmailProcessingWorker`, concurrency `3` -- shared `InMemoryTaskQueue` -- domain services for mail parsing and order processing - -Flow: -1. polling worker checks IMAP -2. polling worker pushes email tasks into queue -3. processing workers consume tasks -4. processing workers execute domain logic -5. runtime aggregates health and status - -This keeps `mail_order_bot` small, explicit, and aligned with current PLBA architecture. +It should not expose queue-centric runtime abstractions as primary architecture. diff --git a/requirements/application_guidelines.md b/requirements/application_guidelines.md new file mode 100644 index 0000000..554aa32 --- /dev/null +++ b/requirements/application_guidelines.md @@ -0,0 +1,148 @@ +# Application Guidelines + +## Purpose + +This document defines the default rules for building business applications on top of `plba`. + +The goal is to keep applications: +- explicit +- small +- easy to debug +- free from platform legacy artifacts + +## Main model + +Build every application around this chain: + +`ApplicationModule` -> `Worker` -> business `Routine` + +Meaning: +- `ApplicationModule` assembles the application +- `Worker` owns runtime execution and lifecycle +- `Routine` owns business behavior + +`Routine` is an application pattern, not a mandatory platform contract. + +## Rules + +### 1. Assemble the app in `ApplicationModule` + +`ApplicationModule` should: +- create application services +- create routines +- create workers +- register workers +- register optional health contributors + +`ApplicationModule` should not: +- execute business logic itself +- contain runtime loops + +### 2. Treat `Worker` as the only primary runtime abstraction + +`Worker` is the core runtime contract of the platform. + +Worker should own: +- `start()` +- `stop(force=False)` +- `health()` +- `status()` +- thread ownership +- execution strategy +- graceful shutdown + +Worker should not own: +- large business flows +- domain decisions +- parsing, persistence, and integration rules all mixed together + +### 3. Keep one worker focused on one business activity + +Default recommendation: +- one worker -> one routine + +If a process has multiple distinct behaviors: +- split it into multiple workers +- or compose several services behind one focused routine + +Do not make a worker a container for unrelated business scenarios. + +### 4. Put business logic into routines and services + +Routine should contain: +- business flow steps +- domain service calls +- business validation +- integration calls +- persistence orchestration + +If the routine becomes too large: +- split business logic into dedicated services +- keep routine as a thin application-level orchestrator + +### 5. Let the worker define the run model + +The worker decides: +- single-run or loop +- one thread or multiple threads +- interval between iterations +- batch or long-running mode +- stop conditions + +The routine does not decide lifecycle strategy. + +### 6. Let the worker compute health + +Routine should not directly set platform health state. + +Instead: +- routine completes successfully +- or returns outcome information +- or raises typed exceptions + +Then worker interprets that into: +- `ok` +- `degraded` +- `unhealthy` + +### 7. Use queues only as optional app-level utilities + +`InMemoryTaskQueue` may be used inside an application when buffering helps. + +But: +- queue is not a core platform concept +- queue usage should stay a local implementation choice +- the app should still be described through workers and routines + +### 8. Keep tracing vocabulary neutral + +Use tracing to describe operations and execution context, not legacy architectural roles. + +Prefer terms like: +- operation +- worker +- routine +- metadata +- step + +Avoid making trace terminology define the application architecture. + +### 9. Keep classes small and responsibilities clear + +Preferred shape: +- thin `ApplicationModule` +- thin `Worker` +- focused `Routine` +- dedicated domain services + +If a class grows too much, split it by responsibility instead of adding more platform abstractions. + +## Checklist + +Before adding a new application component, check: + +1. Is this runtime behavior or business behavior? +2. If runtime behavior, should it live in a `Worker`? +3. If business behavior, should it live in a `Routine` or service? +4. Does this component stay small and single-purpose? +5. Am I adding a queue because it is useful, or because of old mental models? diff --git a/requirements/architecture.md b/requirements/architecture.md index b0ddba0..0334d3a 100644 --- a/requirements/architecture.md +++ b/requirements/architecture.md @@ -2,247 +2,130 @@ ## Overview -The runtime is built as a platform layer for business applications. - -It consists of four logical layers: +PLBA consists of four logical layers: - platform core - platform contracts -- infrastructure adapters +- infrastructure services - business applications +The runtime is centered on `Worker`. + ## Layers ### Platform core -The core contains long-lived runtime services: +Core services: - `RuntimeManager` - `ConfigurationManager` - `WorkerSupervisor` -- `TraceService` -- `HealthRegistry` -- `ControlPlaneService` - `ServiceContainer` -The core is responsible for orchestration, not domain behavior. - -### Platform contracts - -Contracts define how business applications integrate with the runtime. - -Main contracts: -- `ApplicationModule` -- `TaskSource` -- `TaskQueue` -- `Worker` -- `TaskHandler` -- `ConfigProvider` -- `HealthContributor` -- `TraceFactory` - -These contracts must remain domain-agnostic. - -### Infrastructure adapters - -Adapters implement concrete runtime capabilities: -- in-memory queue -- Redis queue -- file config loader -- database config loader -- polling source -- IMAP IDLE source -- HTTP control plane -- trace transport adapters - -Adapters may change between applications and deployments. - -### Business applications - -Applications are built on top of the contracts and adapters. - -Examples: -- `mail_order_bot` -- future event-driven business services - -Applications contain: -- domain models -- domain handlers -- application-specific configuration schema -- source/handler composition - -## Core runtime components - -### RuntimeManager - -The main platform facade. - Responsibilities: - bootstrap runtime -- initialize services -- register application modules -- start and stop all runtime-managed components -- expose status -- coordinate graceful shutdown +- wire core services +- register modules +- start and stop workers +- expose runtime snapshot -### ConfigurationManager +### Platform contracts -Responsibilities: -- load configuration -- validate configuration -- publish config updates -- provide typed config access -- notify subscribers on reload +Main contracts: +- `ApplicationModule` +- `Worker` +- `ConfigProvider` +- `HealthContributor` +- tracing contracts -Configuration should be divided into: -- platform config -- application config -- environment/runtime overrides +These contracts must remain domain-agnostic. -### WorkerSupervisor +### Infrastructure services -Responsibilities: -- register worker definitions -- start worker pools -- monitor worker health -- restart failed workers when appropriate -- manage parallelism and backpressure -- expose worker-level status +Platform services include: +- tracing +- health registry +- logging manager +- control plane +- config providers +- `InMemoryTaskQueue` as optional utility -### TraceService +### Business applications -Responsibilities: -- create traces for operations -- propagate trace context across source -> queue -> worker -> handler boundaries -- provide trace factories to applications -- remain transport-agnostic +Applications define: +- routines +- domain services +- custom worker implementations +- typed app config -### HealthRegistry - -Responsibilities: -- collect health from registered contributors -- aggregate health into liveness/readiness/status views -- expose structured runtime health - -### ControlPlaneService - -Responsibilities: -- control endpoints -- runtime state visibility -- administrative actions -- later authentication and user/session-aware access - -## Main runtime model - -The runtime should operate on this conceptual flow: +## Runtime flow 1. runtime starts 2. configuration is loaded -3. services are initialized -4. application modules register sources, queues, handlers, and workers -5. task sources start producing tasks -6. tasks are published into queues -7. workers consume tasks -8. handlers execute business logic -9. traces and health are updated throughout the flow -10. runtime stops gracefully on request +3. core services become available +4. application modules register workers +5. workers start execution +6. workers call business routines +7. runtime aggregates health and status +8. runtime stops workers on request -## Contracts +## Worker model -### ApplicationModule +Worker is responsible for runtime behavior: +- execution strategy +- thread ownership +- graceful shutdown +- runtime status +- health interpretation -Describes a business application to the runtime. +Routine is responsible for business behavior: +- business decisions +- domain orchestration +- persistence and integrations -Responsibilities: -- register domain services -- register task sources -- register queues -- register worker pools -- register handlers -- declare config requirements -- optionally register health contributors +Recommended shape: -### TaskSource +```python +class SomeWorker(Worker): + def __init__(self, routine) -> None: + self._routine = routine -Produces tasks into queues. + def start(self) -> None: + ... -Examples: -- IMAP polling source -- IMAP IDLE source -- webhook source -- scheduled source + def stop(self, force: bool = False) -> None: + ... -Responsibilities: -- start -- stop -- publish tasks -- expose source status + def health(self) -> WorkerHealth: + ... -### TaskQueue + def status(self) -> WorkerStatus: + ... +``` -A queue abstraction. +## Design rules -Expected operations: -- `publish(task)` -- `consume()` -- `ack(task)` -- `nack(task, retry_delay=None)` -- `stats()` +### 1. Runtime should not know business semantics -The first implementation may be in-memory, but the interface should support future backends. +PLBA knows: +- worker started +- worker stopped +- routine succeeded +- routine failed -### Worker +PLBA does not know: +- what the business operation means +- which domain decision was made -Consumes tasks from a queue and passes them to a handler. +### 2. Queue is not a core architecture primitive -Responsibilities: -- obtain task from queue -- open or resume trace context -- call business handler -- ack or nack the task -- expose worker state +Queues may exist inside applications as implementation details. -### TaskHandler +They must not define the platform mental model. -Executes business logic for one task. +### 3. Keep components small -The runtime should not know what the handler does. -It only knows that a task is processed. +Prefer: +- thin workers +- focused routines +- dedicated domain services -## Mail Order Bot as first application - -### Phase 1 - -- source: IMAP polling -- queue: in-memory queue -- workers: parallel email processing workers -- handler: domain email processing handler -- mark message as read only after successful processing - -### Phase 2 - -- source changes from polling to IMAP IDLE -- queue and workers remain the same - -This demonstrates one of the architectural goals: -the source can change without redesigning the rest of the processing pipeline. - -## Suggested package structure - -```text -src/ - app_runtime/ - core/ - contracts/ - config/ - workers/ - queue/ - tracing/ - health/ - control/ - container/ - adapters/ - mail_order_bot_app/ - module/ - sources/ - handlers/ - services/ - domain/ +Avoid large platform abstractions that exist only for hypothetical reuse. diff --git a/requirements/vision.md b/requirements/vision.md index 41b88b1..5356147 100644 --- a/requirements/vision.md +++ b/requirements/vision.md @@ -1,119 +1,38 @@ # Vision -## Purpose +## Product vision -This project provides a reusable runtime platform for business applications. +PLBA should be a transparent runtime for business applications. -The runtime exists to solve service and infrastructure concerns that are needed by many applications but do not belong to business logic: -- start and stop -- status and lifecycle -- configuration -- worker execution -- trace propagation -- health checks -- control and administration -- later authentication and user management +The desired feeling of the platform: +- simple to read +- explicit in behavior +- small number of core concepts +- easy to debug +- no architectural legacy artifacts -The runtime should allow a business application to focus only on domain behavior. +## Core concepts -## Vision statement +The platform should be understandable through three ideas: +- `ApplicationModule` assembles the app +- `Worker` owns lifecycle and execution +- business behavior lives in application routines and services -We build a platform where business applications are assembled from small domain modules, while the runtime consistently provides lifecycle, workers, tracing, configuration, and control-plane capabilities. +## Non-goals -## Problem +PLBA should not become: +- a framework of many specialized runtime roles +- a queue-centric architecture by default +- a compatibility shell for legacy abstractions +- a place where business logic hides inside infrastructure classes -In the previous generation, the execution model was centered around a single `execute()` entry point called by a timer loop. +## Utility components -That model works for simple periodic jobs, but it becomes too narrow when we need: -- queue-driven processing -- multiple concurrent workers -- independent task sources -- richer health semantics -- trace propagation across producer and consumer boundaries -- reusable runtime patterns across different applications -- future admin and authentication capabilities +Some utility components may still exist, for example `InMemoryTaskQueue`. -The old model couples orchestration and execution too tightly. +They are acceptable when they stay: +- optional +- local +- implementation-oriented -## Desired future state - -The new runtime should provide: -- a reusable lifecycle and orchestration core -- a clean contract for business applications -- support for sources, queues, workers, and handlers -- explicit separation between platform and domain -- trace and health as first-class platform services -- the ability to evolve into an admin platform - -## Design principles - -### 1. Platform vs domain separation - -The runtime owns platform concerns. -Applications own domain concerns. - -The runtime must not contain business rules such as: -- email parsing policies -- order creation logic -- invoice decisions -- client-specific rules - -### 2. Composition over inheritance - -Applications should be composed by registering modules and services in the runtime, not by inheriting a timer-driven class and overriding one method. - -### 3. Explicit task model - -Applications should model processing as: -- task source -- task queue -- worker -- handler - -This is more scalable than one monolithic execute loop. - -### 4. Parallelism as a first-class concern - -The runtime should supervise worker pools and concurrency safely instead of leaving this to ad hoc application code. - -### 5. Observability by default - -Trace, logs, metrics, and health should be available as platform services from the start. - -### 6. Evolvability - -The runtime should be able to support: -- different queue backends -- different task sources -- different control planes -- different admin capabilities -without forcing business applications to change their domain code. - -## First target use case - -The first business application is `mail_order_bot`. - -Its business domain is: -- fetching incoming mail -- processing attachments -- executing order-related pipelines - -Its platform/runtime needs are: -- lifecycle management -- polling or IMAP IDLE source -- queueing -- worker pools -- tracing -- health checks -- future admin API - -This makes it a good pilot for the new runtime. - -## Success criteria - -We consider the runtime direction successful if: -- `mail_order_bot` business logic can run on top of it without leaking infrastructure details into domain code -- the runtime can manage concurrent workers -- the runtime can support a queue-based flow -- the runtime can expose status and health -- the runtime can later host admin/auth features without redesigning the core +They should not redefine the main platform model. diff --git a/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc index 35ebf3e..a51f034 100644 Binary files a/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc and b/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/application.py b/src/app_runtime/contracts/application.py index 82441f6..dadfecf 100644 --- a/src/app_runtime/contracts/application.py +++ b/src/app_runtime/contracts/application.py @@ -13,4 +13,4 @@ class ApplicationModule(ABC): @abstractmethod def register(self, registry: ModuleRegistry) -> None: - """Register workers, queues, handlers, services, and health contributors.""" + """Register workers, services, and health contributors.""" diff --git a/src/app_runtime/contracts/queue.py b/src/app_runtime/contracts/queue.py deleted file mode 100644 index 498796d..0000000 --- a/src/app_runtime/contracts/queue.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import Any - -from app_runtime.contracts.tasks import Task - - -class TaskQueue(ABC): - @abstractmethod - def publish(self, task: Task) -> None: - """Push a task into the queue.""" - - @abstractmethod - def consume(self, timeout: float = 0.1) -> Task | None: - """Return the next available task or None.""" - - @abstractmethod - def ack(self, task: Task) -> None: - """Confirm successful task processing.""" - - @abstractmethod - def nack(self, task: Task, retry_delay: float | None = None) -> None: - """Signal failed task processing.""" - - @abstractmethod - def stats(self) -> dict[str, Any]: - """Return transport-level queue statistics.""" diff --git a/src/app_runtime/contracts/tasks.py b/src/app_runtime/contracts/tasks.py deleted file mode 100644 index 7f552ba..0000000 --- a/src/app_runtime/contracts/tasks.py +++ /dev/null @@ -1,18 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from typing import Any - - -@dataclass(slots=True) -class Task: - name: str - payload: dict[str, Any] - metadata: dict[str, Any] = field(default_factory=dict) - - -class TaskHandler(ABC): - @abstractmethod - def handle(self, task: Task) -> None: - """Execute domain logic for a task.""" diff --git a/src/app_runtime/core/__pycache__/registration.cpython-312.pyc b/src/app_runtime/core/__pycache__/registration.cpython-312.pyc index ef75b12..b36c669 100644 Binary files a/src/app_runtime/core/__pycache__/registration.cpython-312.pyc and b/src/app_runtime/core/__pycache__/registration.cpython-312.pyc differ diff --git a/src/app_runtime/core/registration.py b/src/app_runtime/core/registration.py index 9be2a1e..361c297 100644 --- a/src/app_runtime/core/registration.py +++ b/src/app_runtime/core/registration.py @@ -1,8 +1,6 @@ from __future__ import annotations from app_runtime.contracts.health import HealthContributor -from app_runtime.contracts.queue import TaskQueue -from app_runtime.contracts.tasks import TaskHandler from app_runtime.contracts.worker import Worker from app_runtime.core.service_container import ServiceContainer @@ -10,8 +8,6 @@ from app_runtime.core.service_container import ServiceContainer class ModuleRegistry: def __init__(self, services: ServiceContainer) -> None: self.services = services - self.queues: dict[str, TaskQueue] = {} - self.handlers: dict[str, TaskHandler] = {} self.workers: list[Worker] = [] self.health_contributors: list[HealthContributor] = [] self.modules: list[str] = [] @@ -19,12 +15,6 @@ class ModuleRegistry: def register_module(self, name: str) -> None: self.modules.append(name) - def add_queue(self, name: str, queue: TaskQueue) -> None: - self.queues[name] = queue - - def add_handler(self, name: str, handler: TaskHandler) -> None: - self.handlers[name] = handler - def add_worker(self, worker: Worker) -> None: self.workers.append(worker) diff --git a/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc b/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc index 98aa863..ee09eea 100644 Binary files a/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc and b/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc differ diff --git a/src/app_runtime/queue/in_memory.py b/src/app_runtime/queue/in_memory.py index 16d1db6..7c2cd7e 100644 --- a/src/app_runtime/queue/in_memory.py +++ b/src/app_runtime/queue/in_memory.py @@ -1,43 +1,38 @@ from __future__ import annotations from queue import Empty, Queue +from typing import Generic, TypeVar -from app_runtime.contracts.queue import TaskQueue -from app_runtime.contracts.tasks import Task +T = TypeVar("T") -class InMemoryTaskQueue(TaskQueue): +class InMemoryTaskQueue(Generic[T]): def __init__(self) -> None: - self._queue: Queue[Task] = Queue() - self._published = 0 - self._acked = 0 - self._nacked = 0 + self._queue: Queue[T] = Queue() + self._put_count = 0 + self._get_count = 0 - def publish(self, task: Task) -> None: - self._published += 1 - self._queue.put(task) + def put(self, item: T) -> None: + self._put_count += 1 + self._queue.put(item) - def consume(self, timeout: float = 0.1) -> Task | None: + def get(self, timeout: float = 0.1) -> T | None: try: - return self._queue.get(timeout=timeout) + item = self._queue.get(timeout=timeout) except Empty: return None + self._get_count += 1 + return item - def ack(self, task: Task) -> None: - del task - self._acked += 1 + def task_done(self) -> None: self._queue.task_done() - def nack(self, task: Task, retry_delay: float | None = None) -> None: - del retry_delay - self._nacked += 1 - self._queue.put(task) - self._queue.task_done() + def qsize(self) -> int: + return self._queue.qsize() def stats(self) -> dict[str, int]: return { - "published": self._published, - "acked": self._acked, - "nacked": self._nacked, + "put": self._put_count, + "got": self._get_count, "queued": self._queue.qsize(), } diff --git a/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc index c81725e..425362a 100644 Binary files a/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc and b/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc index 288c8ed..9fee83a 100644 Binary files a/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc and b/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc index 161cf01..e801f08 100644 Binary files a/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc and b/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/service.py b/src/app_runtime/tracing/service.py index 00a8470..00bc284 100644 --- a/src/app_runtime/tracing/service.py +++ b/src/app_runtime/tracing/service.py @@ -99,7 +99,7 @@ class TraceService(TraceContextFactory): self._write_message("ERROR", message, status, attrs) def new_root(self, operation: str) -> TraceContext: - trace_id = self.create_context(alias=operation, kind="source", attrs={"operation": operation}) + trace_id = self.create_context(alias=operation, kind="operation", attrs={"operation": operation}) return TraceContext(trace_id=trace_id, span_id=trace_id, attributes={"operation": operation}) def child_of(self, parent: TraceContext, operation: str) -> TraceContext: @@ -116,22 +116,22 @@ class TraceService(TraceContextFactory): attributes={"operation": operation}, ) - def attach(self, task_metadata: dict[str, object], context: TraceContext) -> dict[str, object]: - updated = dict(task_metadata) + def attach(self, metadata: dict[str, object], context: TraceContext) -> dict[str, object]: + updated = dict(metadata) updated["trace_id"] = context.trace_id updated["span_id"] = context.span_id updated["parent_span_id"] = context.parent_span_id return updated - def resume(self, task_metadata: dict[str, object], operation: str) -> TraceContext: - trace_id = str(task_metadata.get("trace_id") or uuid4().hex) - span_id = str(task_metadata.get("span_id") or trace_id) - parent_id = task_metadata.get("parent_span_id") + def resume(self, metadata: dict[str, object], operation: str) -> TraceContext: + trace_id = str(metadata.get("trace_id") or uuid4().hex) + span_id = str(metadata.get("span_id") or trace_id) + parent_id = metadata.get("parent_span_id") self.create_context( alias=operation, parent_id=str(parent_id) if parent_id else None, - kind="handler", - attrs=dict(task_metadata), + kind="worker", + attrs=dict(metadata), ) return TraceContext( trace_id=trace_id, diff --git a/src/app_runtime/workers/__init__.py b/src/app_runtime/workers/__init__.py index 69099d4..ca7672b 100644 --- a/src/app_runtime/workers/__init__.py +++ b/src/app_runtime/workers/__init__.py @@ -1,4 +1,3 @@ -from app_runtime.workers.queue_worker import QueueWorker from app_runtime.workers.supervisor import WorkerSupervisor -__all__ = ["QueueWorker", "WorkerSupervisor"] +__all__ = ["WorkerSupervisor"] diff --git a/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc index 3b1f2ec..f680d9d 100644 Binary files a/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc and b/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workers/queue_worker.py b/src/app_runtime/workers/queue_worker.py deleted file mode 100644 index 8132e3c..0000000 --- a/src/app_runtime/workers/queue_worker.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -from threading import Event, Lock, Thread - -from app_runtime.contracts.queue import TaskQueue -from app_runtime.contracts.tasks import TaskHandler -from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus -from app_runtime.tracing.service import TraceService - - -class QueueWorker(Worker): - def __init__( - self, - name: str, - queue: TaskQueue, - handler: TaskHandler, - traces: TraceService, - *, - concurrency: int = 1, - critical: bool = True, - ) -> None: - self._name = name - self._queue = queue - self._handler = handler - self._traces = traces - self._concurrency = concurrency - self._critical = critical - self._threads: list[Thread] = [] - self._stop_requested = Event() - self._force_stop = Event() - self._lock = Lock() - self._started = False - self._in_flight = 0 - self._processed = 0 - self._failures = 0 - - @property - def name(self) -> str: - return self._name - - @property - def critical(self) -> bool: - return self._critical - - def start(self) -> None: - if any(thread.is_alive() for thread in self._threads): - return - self._threads.clear() - self._stop_requested.clear() - self._force_stop.clear() - self._started = True - for index in range(self._concurrency): - thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True) - self._threads.append(thread) - thread.start() - - def stop(self, force: bool = False) -> None: - self._stop_requested.set() - if force: - self._force_stop.set() - - def health(self) -> WorkerHealth: - status = self.status() - if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0: - return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta) - if self._failures > 0: - return WorkerHealth(self.name, "degraded", self.critical, "worker has processing failures", status.meta) - return WorkerHealth(self.name, "ok", self.critical, meta=status.meta) - - def status(self) -> WorkerStatus: - alive_threads = self._alive_threads() - with self._lock: - in_flight = self._in_flight - processed = self._processed - failures = self._failures - if self._started and alive_threads == 0: - state = "stopped" - elif self._stop_requested.is_set(): - state = "stopping" if alive_threads > 0 else "stopped" - elif not self._started: - state = "stopped" - elif in_flight > 0: - state = "busy" - else: - state = "idle" - return WorkerStatus( - name=self.name, - state=state, - in_flight=in_flight, - meta={ - "alive_threads": alive_threads, - "concurrency": self._concurrency, - "processed": processed, - "failures": failures, - }, - ) - - def _run_loop(self) -> None: - while True: - if self._force_stop.is_set() or self._stop_requested.is_set(): - return - task = self._queue.consume(timeout=0.1) - if task is None: - continue - with self._lock: - self._in_flight += 1 - self._traces.resume(task.metadata, f"worker:{self.name}") - try: - self._handler.handle(task) - except Exception: - with self._lock: - self._failures += 1 - self._queue.nack(task) - else: - with self._lock: - self._processed += 1 - self._queue.ack(task) - finally: - with self._lock: - self._in_flight -= 1 - if self._stop_requested.is_set(): - return - - def _alive_threads(self) -> int: - return sum(1 for thread in self._threads if thread.is_alive()) diff --git a/src/app_runtime/workflow/__init__.py b/src/app_runtime/workflow/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/workflow/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/workflow/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workflow/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..2a3f2ed Binary files /dev/null and b/src/app_runtime/workflow/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/__pycache__/runtime_factory.cpython-312.pyc b/src/app_runtime/workflow/__pycache__/runtime_factory.cpython-312.pyc new file mode 100644 index 0000000..16993e9 Binary files /dev/null and b/src/app_runtime/workflow/__pycache__/runtime_factory.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/__init__.py b/src/app_runtime/workflow/contracts/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/workflow/contracts/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/workflow/contracts/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workflow/contracts/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..45a4821 Binary files /dev/null and b/src/app_runtime/workflow/contracts/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/__pycache__/context.cpython-312.pyc b/src/app_runtime/workflow/contracts/__pycache__/context.cpython-312.pyc new file mode 100644 index 0000000..1ab1193 Binary files /dev/null and b/src/app_runtime/workflow/contracts/__pycache__/context.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/__pycache__/result.cpython-312.pyc b/src/app_runtime/workflow/contracts/__pycache__/result.cpython-312.pyc new file mode 100644 index 0000000..767ac29 Binary files /dev/null and b/src/app_runtime/workflow/contracts/__pycache__/result.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/__pycache__/step.cpython-312.pyc b/src/app_runtime/workflow/contracts/__pycache__/step.cpython-312.pyc new file mode 100644 index 0000000..59112ec Binary files /dev/null and b/src/app_runtime/workflow/contracts/__pycache__/step.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/__pycache__/workflow.cpython-312.pyc b/src/app_runtime/workflow/contracts/__pycache__/workflow.cpython-312.pyc new file mode 100644 index 0000000..571b110 Binary files /dev/null and b/src/app_runtime/workflow/contracts/__pycache__/workflow.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/contracts/context.py b/src/app_runtime/workflow/contracts/context.py new file mode 100644 index 0000000..b64b97b --- /dev/null +++ b/src/app_runtime/workflow/contracts/context.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(slots=True) +class WorkflowContext: + payload: dict[str, Any] + state: dict[str, Any] = field(default_factory=dict) + + def snapshot(self) -> dict[str, Any]: + return { + "payload": dict(self.payload), + "state": dict(self.state), + } diff --git a/src/app_runtime/workflow/contracts/result.py b/src/app_runtime/workflow/contracts/result.py new file mode 100644 index 0000000..f4907e3 --- /dev/null +++ b/src/app_runtime/workflow/contracts/result.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(slots=True) +class StepResult: + transition: str = "success" + updates: dict[str, Any] = field(default_factory=dict) + status: str = "completed" diff --git a/src/app_runtime/workflow/contracts/step.py b/src/app_runtime/workflow/contracts/step.py new file mode 100644 index 0000000..fc350ed --- /dev/null +++ b/src/app_runtime/workflow/contracts/step.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + +from app_runtime.workflow.contracts.context import WorkflowContext +from app_runtime.workflow.contracts.result import StepResult + + +class WorkflowStep(ABC): + @abstractmethod + def run(self, context: WorkflowContext) -> StepResult: + """Run the step and return transition metadata.""" diff --git a/src/app_runtime/workflow/contracts/workflow.py b/src/app_runtime/workflow/contracts/workflow.py new file mode 100644 index 0000000..92e2b58 --- /dev/null +++ b/src/app_runtime/workflow/contracts/workflow.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from app_runtime.workflow.contracts.step import WorkflowStep + + +@dataclass(slots=True) +class WorkflowNode: + name: str + step: WorkflowStep + transitions: dict[str, str] = field(default_factory=dict) + + +@dataclass(slots=True) +class WorkflowDefinition: + name: str + start_at: str + nodes: dict[str, WorkflowNode] diff --git a/src/app_runtime/workflow/engine/__init__.py b/src/app_runtime/workflow/engine/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/workflow/engine/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/workflow/engine/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workflow/engine/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..fdf4c33 Binary files /dev/null and b/src/app_runtime/workflow/engine/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/engine/__pycache__/hooks.cpython-312.pyc b/src/app_runtime/workflow/engine/__pycache__/hooks.cpython-312.pyc new file mode 100644 index 0000000..92ac31a Binary files /dev/null and b/src/app_runtime/workflow/engine/__pycache__/hooks.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/engine/__pycache__/transition_resolver.cpython-312.pyc b/src/app_runtime/workflow/engine/__pycache__/transition_resolver.cpython-312.pyc new file mode 100644 index 0000000..ac5b7ff Binary files /dev/null and b/src/app_runtime/workflow/engine/__pycache__/transition_resolver.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/engine/__pycache__/workflow_engine.cpython-312.pyc b/src/app_runtime/workflow/engine/__pycache__/workflow_engine.cpython-312.pyc new file mode 100644 index 0000000..f479298 Binary files /dev/null and b/src/app_runtime/workflow/engine/__pycache__/workflow_engine.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/engine/hooks.py b/src/app_runtime/workflow/engine/hooks.py new file mode 100644 index 0000000..6f46b57 --- /dev/null +++ b/src/app_runtime/workflow/engine/hooks.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from app_runtime.workflow.contracts.context import WorkflowContext + + +class WorkflowEngineHooks: + def on_step_started(self, context: WorkflowContext, step: str) -> None: + del context, step + + def on_step_finished(self, context: WorkflowContext, step: str) -> None: + del context, step + + def on_step_failed(self, context: WorkflowContext, step: str) -> None: + del context, step diff --git a/src/app_runtime/workflow/engine/transition_resolver.py b/src/app_runtime/workflow/engine/transition_resolver.py new file mode 100644 index 0000000..65b179a --- /dev/null +++ b/src/app_runtime/workflow/engine/transition_resolver.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from app_runtime.workflow.contracts.result import StepResult +from app_runtime.workflow.contracts.workflow import WorkflowNode + + +class TransitionResolver: + def resolve(self, node: WorkflowNode, result: StepResult) -> str | None: + return node.transitions.get(result.transition) diff --git a/src/app_runtime/workflow/engine/workflow_engine.py b/src/app_runtime/workflow/engine/workflow_engine.py new file mode 100644 index 0000000..5383f74 --- /dev/null +++ b/src/app_runtime/workflow/engine/workflow_engine.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import logging + +from app_runtime.workflow.contracts.context import WorkflowContext +from app_runtime.workflow.engine.hooks import WorkflowEngineHooks +from app_runtime.workflow.engine.transition_resolver import TransitionResolver + + +class WorkflowEngine: + def __init__(self, workflow, persistence, *, traces, hooks: WorkflowEngineHooks | None = None) -> None: + self._workflow = workflow + self._persistence = persistence + self._transition_resolver = TransitionResolver() + self._traces = traces + self._hooks = hooks or WorkflowEngineHooks() + self._logger = logging.getLogger(__name__) + + def run(self, context: WorkflowContext) -> dict[str, object]: + run_id = self._persistence.start_run( + self._workflow.definition.name, + self._workflow.definition.start_at, + context.snapshot(), + ) + context.state.setdefault("runtime", {}) + context.state["runtime"]["workflow_run_id"] = run_id + self._traces.step("workflow") + self._traces.info("Workflow started.", status="started", attrs={"workflow_run_id": run_id}) + current_name = self._workflow.definition.start_at + while current_name is not None: + node = self._workflow.definition.nodes[current_name] + self._logger.info("Workflow run %s: step '%s' started.", run_id, node.name) + self._hooks.on_step_started(context, node.name) + self._persistence.start_step(run_id, node.name, context.snapshot()) + self._traces.step(node.name) + self._traces.info(f"Step '{node.name}' started.", status="started") + try: + result = node.step.run(context) + except Exception as error: + self._persistence.fail_step(run_id, node.name, context.snapshot(), error) + self._persistence.fail_run(run_id, context.snapshot()) + self._traces.error( + f"Step '{node.name}' failed: {error}", + status="failed", + attrs={"exception_type": type(error).__name__}, + ) + self._logger.exception("Workflow run %s: step '%s' failed.", run_id, node.name) + self._hooks.on_step_failed(context, node.name) + raise + context.state.update(result.updates) + self._persistence.complete_step(run_id, node.name, result.status, result.transition, context.snapshot()) + self._traces.info( + f"Step '{node.name}' completed with transition '{result.transition}'.", + status=result.status, + ) + self._logger.info( + "Workflow run %s: step '%s' completed with transition '%s'.", + run_id, + node.name, + result.transition, + ) + self._hooks.on_step_finished(context, node.name) + current_name = self._transition_resolver.resolve(node, result) + self._persistence.complete_run(run_id, context.snapshot()) + self._traces.step("workflow") + self._traces.info("Workflow completed.", status="completed", attrs={"workflow_run_id": run_id}) + self._logger.info("Workflow run %s completed.", run_id) + return {"run_id": run_id, "status": "completed", "context": context.snapshot()} diff --git a/src/app_runtime/workflow/persistence/__init__.py b/src/app_runtime/workflow/persistence/__init__.py new file mode 100644 index 0000000..c680823 --- /dev/null +++ b/src/app_runtime/workflow/persistence/__init__.py @@ -0,0 +1,3 @@ +from app_runtime.workflow.persistence.workflow_persistence import WorkflowPersistence + +__all__ = ["WorkflowPersistence"] diff --git a/src/app_runtime/workflow/persistence/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workflow/persistence/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..fea1184 Binary files /dev/null and b/src/app_runtime/workflow/persistence/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/persistence/__pycache__/checkpoint_repository.cpython-312.pyc b/src/app_runtime/workflow/persistence/__pycache__/checkpoint_repository.cpython-312.pyc new file mode 100644 index 0000000..233c8b7 Binary files /dev/null and b/src/app_runtime/workflow/persistence/__pycache__/checkpoint_repository.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/persistence/__pycache__/snapshot_sanitizer.cpython-312.pyc b/src/app_runtime/workflow/persistence/__pycache__/snapshot_sanitizer.cpython-312.pyc new file mode 100644 index 0000000..25c5fb0 Binary files /dev/null and b/src/app_runtime/workflow/persistence/__pycache__/snapshot_sanitizer.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/persistence/__pycache__/workflow_persistence.cpython-312.pyc b/src/app_runtime/workflow/persistence/__pycache__/workflow_persistence.cpython-312.pyc new file mode 100644 index 0000000..bcab399 Binary files /dev/null and b/src/app_runtime/workflow/persistence/__pycache__/workflow_persistence.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/persistence/__pycache__/workflow_repository.cpython-312.pyc b/src/app_runtime/workflow/persistence/__pycache__/workflow_repository.cpython-312.pyc new file mode 100644 index 0000000..b5bce8b Binary files /dev/null and b/src/app_runtime/workflow/persistence/__pycache__/workflow_repository.cpython-312.pyc differ diff --git a/src/app_runtime/workflow/persistence/checkpoint_repository.py b/src/app_runtime/workflow/persistence/checkpoint_repository.py new file mode 100644 index 0000000..5369466 --- /dev/null +++ b/src/app_runtime/workflow/persistence/checkpoint_repository.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from typing import Any + + +class CheckpointRepository: + def __init__(self, connection_factory: object | None = None) -> None: + self._connection_factory = connection_factory + self._checkpoints: list[dict[str, Any]] = [] + + def save( + self, + workflow_run_id: int, + node_name: str, + checkpoint_kind: str, + snapshot: dict[str, Any], + ) -> None: + if self._use_memory(): + self._checkpoints.append( + { + "workflow_run_id": workflow_run_id, + "node_name": node_name, + "checkpoint_kind": checkpoint_kind, + "snapshot": snapshot, + } + ) + return + query = """ + INSERT INTO workflow_checkpoints ( + workflow_run_id, node_name, checkpoint_kind, snapshot_json, created_at + ) VALUES (%s, %s, %s, %s, UTC_TIMESTAMP(6)) + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute( + query, + ( + workflow_run_id, + node_name, + checkpoint_kind, + self._connection_factory.dumps(snapshot), + ), + ) + + def _use_memory(self) -> bool: + return self._connection_factory is None or not self._connection_factory.is_configured() diff --git a/src/app_runtime/workflow/persistence/snapshot_sanitizer.py b/src/app_runtime/workflow/persistence/snapshot_sanitizer.py new file mode 100644 index 0000000..f90cd05 --- /dev/null +++ b/src/app_runtime/workflow/persistence/snapshot_sanitizer.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import Any + + +class WorkflowSnapshotSanitizer: + def sanitize(self, snapshot: dict[str, Any]) -> dict[str, Any]: + payload = dict(snapshot.get("payload", {})) + state = dict(snapshot.get("state", {})) + return { + "payload": self._sanitize_dict(payload), + "state": self._sanitize_dict(state), + } + + def _sanitize_dict(self, value: Any) -> Any: + if isinstance(value, dict): + return {str(key): self._sanitize_dict(item) for key, item in value.items()} + if isinstance(value, list): + return [self._sanitize_dict(item) for item in value] + return value diff --git a/src/app_runtime/workflow/persistence/workflow_persistence.py b/src/app_runtime/workflow/persistence/workflow_persistence.py new file mode 100644 index 0000000..71a273c --- /dev/null +++ b/src/app_runtime/workflow/persistence/workflow_persistence.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from app_runtime.workflow.persistence.checkpoint_repository import CheckpointRepository +from app_runtime.workflow.persistence.snapshot_sanitizer import WorkflowSnapshotSanitizer +from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository + + +class WorkflowPersistence: + def __init__(self, workflow_repository, checkpoint_repository) -> None: + self._workflow_repository = workflow_repository + self._checkpoint_repository = checkpoint_repository + self._snapshot_sanitizer = WorkflowSnapshotSanitizer() + + @classmethod + def create_default(cls, connection_factory=None) -> "WorkflowPersistence": + return cls( + workflow_repository=WorkflowRepository(connection_factory), + checkpoint_repository=CheckpointRepository(connection_factory), + ) + + def start_run(self, workflow_name: str, start_at: str, snapshot: dict[str, object]) -> int: + sanitized = self._snapshot_sanitizer.sanitize(snapshot) + run_id = self._workflow_repository.create_run(workflow_name, sanitized) + self._checkpoint_repository.save(run_id, start_at, "workflow_started", sanitized) + return run_id + + def start_step(self, run_id: int, node_name: str, snapshot: dict[str, object]) -> None: + self._checkpoint_repository.save(run_id, node_name, "step_started", self._snapshot_sanitizer.sanitize(snapshot)) + + def complete_step( + self, + run_id: int, + node_name: str, + status: str, + transition: str, + snapshot: dict[str, object], + ) -> None: + sanitized = self._snapshot_sanitizer.sanitize(snapshot) + self._workflow_repository.record_step(run_id, node_name, status, transition, sanitized) + self._checkpoint_repository.save(run_id, node_name, "step_completed", sanitized) + + def fail_step(self, run_id: int, node_name: str, snapshot: dict[str, object], error: Exception) -> None: + sanitized = self._snapshot_sanitizer.sanitize(snapshot) + self._workflow_repository.fail_step(run_id, node_name, sanitized, error) + self._checkpoint_repository.save(run_id, node_name, "step_failed", sanitized) + + def complete_run(self, run_id: int, snapshot: dict[str, object]) -> None: + sanitized = self._snapshot_sanitizer.sanitize(snapshot) + self._workflow_repository.complete_run(run_id, sanitized) + self._checkpoint_repository.save(run_id, "workflow_done", "workflow_finished", sanitized) + + def fail_run(self, run_id: int, snapshot: dict[str, object]) -> None: + self._workflow_repository.fail_run(run_id, self._snapshot_sanitizer.sanitize(snapshot)) diff --git a/src/app_runtime/workflow/persistence/workflow_repository.py b/src/app_runtime/workflow/persistence/workflow_repository.py new file mode 100644 index 0000000..7c5512c --- /dev/null +++ b/src/app_runtime/workflow/persistence/workflow_repository.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +from itertools import count +from typing import Any + + +class WorkflowRepository: + def __init__(self, connection_factory: object | None = None) -> None: + self._connection_factory = connection_factory + self._counter = count(1) + self._runs: dict[int, dict[str, Any]] = {} + + def create_run(self, workflow_name: str, snapshot: dict[str, Any]) -> int: + if self._use_memory(): + run_id = next(self._counter) + self._runs[run_id] = { + "workflow_name": workflow_name, + "status": "running", + "snapshot": snapshot, + "steps": [], + } + return run_id + payload = self._build_run_payload(workflow_name, snapshot) + query = """ + INSERT INTO workflow_runs ( + workflow_name, workflow_version, business_key, queue_task_id, inbox_message_id, + current_node, status, context_json, trace_id, started_at, created_at, updated_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, UTC_TIMESTAMP(6), UTC_TIMESTAMP(6), UTC_TIMESTAMP(6)) + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute(query, payload) + return int(cursor.lastrowid) + + def record_step( + self, + run_id: int, + node_name: str, + status: str, + transition: str, + snapshot: dict[str, Any], + ) -> None: + if self._use_memory(): + self._runs[run_id]["steps"].append( + {"node_name": node_name, "status": status, "transition": transition, "snapshot": snapshot} + ) + return + context_json = self._connection_factory.dumps(snapshot) + insert_query = """ + INSERT INTO workflow_steps ( + workflow_run_id, node_name, status, transition_name, input_json, output_json, created_at, started_at, finished_at + ) VALUES (%s, %s, %s, %s, %s, %s, UTC_TIMESTAMP(6), UTC_TIMESTAMP(6), UTC_TIMESTAMP(6)) + """ + update_query = """ + UPDATE workflow_runs + SET current_node = %s, status = %s, context_json = %s, updated_at = UTC_TIMESTAMP(6) + WHERE id = %s + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute(insert_query, (run_id, node_name, status, transition, context_json, context_json)) + cursor.execute(update_query, (node_name, "running", context_json, run_id)) + + def complete_run(self, run_id: int, snapshot: dict[str, Any]) -> None: + if self._use_memory(): + self._runs[run_id]["status"] = "completed" + self._runs[run_id]["snapshot"] = snapshot + return + query = """ + UPDATE workflow_runs + SET current_node = NULL, status = 'completed', context_json = %s, finished_at = UTC_TIMESTAMP(6), updated_at = UTC_TIMESTAMP(6) + WHERE id = %s + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute(query, (self._connection_factory.dumps(snapshot), run_id)) + + def fail_step(self, run_id: int, node_name: str, snapshot: dict[str, Any], error: Exception) -> None: + if self._use_memory(): + self._runs[run_id]["steps"].append( + { + "node_name": node_name, + "status": "failed", + "transition": "error", + "snapshot": snapshot, + "error": str(error), + } + ) + self._runs[run_id]["status"] = "failed" + return + snapshot_json = self._connection_factory.dumps(snapshot) + error_json = self._connection_factory.dumps({"message": str(error), "exception_type": type(error).__name__}) + insert_query = """ + INSERT INTO workflow_steps ( + workflow_run_id, node_name, status, transition_name, input_json, output_json, error_json, created_at, started_at, finished_at + ) VALUES (%s, %s, 'failed', 'error', %s, %s, %s, UTC_TIMESTAMP(6), UTC_TIMESTAMP(6), UTC_TIMESTAMP(6)) + """ + update_query = """ + UPDATE workflow_runs + SET current_node = %s, status = 'failed', context_json = %s, updated_at = UTC_TIMESTAMP(6) + WHERE id = %s + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute(insert_query, (run_id, node_name, snapshot_json, snapshot_json, error_json)) + cursor.execute(update_query, (node_name, snapshot_json, run_id)) + + def fail_run(self, run_id: int, snapshot: dict[str, Any]) -> None: + if self._use_memory(): + self._runs[run_id]["status"] = "failed" + self._runs[run_id]["snapshot"] = snapshot + return + query = """ + UPDATE workflow_runs + SET status = 'failed', context_json = %s, finished_at = UTC_TIMESTAMP(6), updated_at = UTC_TIMESTAMP(6) + WHERE id = %s + """ + with self._connection_factory.connect() as connection: + with connection.cursor() as cursor: + cursor.execute(query, (self._connection_factory.dumps(snapshot), run_id)) + + def _build_run_payload(self, workflow_name: str, snapshot: dict[str, Any]) -> tuple: + payload = snapshot.get("payload", {}) + state = snapshot.get("state", {}) + runtime = state.get("runtime", {}) + business_key = payload.get("inbox_message", {}).get("external_message_id") or str(next(self._counter)) + return ( + workflow_name, + "v1", + business_key, + runtime.get("queue_task_id"), + payload.get("inbox_message", {}).get("id"), + None, + "running", + self._connection_factory.dumps(snapshot), + runtime.get("email_trace_id"), + ) + + def _use_memory(self) -> bool: + return self._connection_factory is None or not self._connection_factory.is_configured() diff --git a/src/app_runtime/workflow/runtime_factory.py b/src/app_runtime/workflow/runtime_factory.py new file mode 100644 index 0000000..d579fd2 --- /dev/null +++ b/src/app_runtime/workflow/runtime_factory.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from app_runtime.workflow.engine.workflow_engine import WorkflowEngine +from app_runtime.workflow.persistence import WorkflowPersistence + + +class WorkflowRuntimeFactory: + def __init__(self, connection_factory=None, *, traces, hooks=None) -> None: + self._connection_factory = connection_factory + self._traces = traces + self._hooks = hooks + + def create_engine(self, workflow) -> WorkflowEngine: + persistence = WorkflowPersistence.create_default(self._connection_factory) + return WorkflowEngine(workflow, persistence, traces=self._traces, hooks=self._hooks) diff --git a/src/plba/__init__.py b/src/plba/__init__.py index f8031c7..f2876d1 100644 --- a/src/plba/__init__.py +++ b/src/plba/__init__.py @@ -5,9 +5,6 @@ from plba.contracts import ( ApplicationModule, ConfigProvider, HealthContributor, - Task, - TaskHandler, - TaskQueue, TraceContext, TraceContextRecord, TraceLogMessage, @@ -21,7 +18,17 @@ from plba.health import HealthRegistry from plba.logging import LogManager from plba.queue import InMemoryTaskQueue from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService -from plba.workers import QueueWorker, WorkerSupervisor +from plba.workflow import ( + StepResult, + WorkflowContext, + WorkflowDefinition, + WorkflowEngine, + WorkflowEngineHooks, + WorkflowNode, + WorkflowRuntimeFactory, + WorkflowStep, +) +from plba.workers import WorkerSupervisor __all__ = [ "ApplicationModule", @@ -40,19 +47,23 @@ __all__ = [ "LogManager", "MySqlTraceTransport", "NoOpTraceTransport", - "QueueWorker", "RuntimeManager", "ServiceContainer", - "Task", - "TaskHandler", - "TaskQueue", "TraceContext", "TraceContextRecord", "TraceLogMessage", "TraceService", "TraceTransport", + "StepResult", "Worker", "WorkerHealth", "WorkerStatus", + "WorkflowContext", + "WorkflowDefinition", + "WorkflowEngine", + "WorkflowEngineHooks", + "WorkflowNode", + "WorkflowRuntimeFactory", + "WorkflowStep", "WorkerSupervisor", ] diff --git a/src/plba/__pycache__/__init__.cpython-312.pyc b/src/plba/__pycache__/__init__.cpython-312.pyc index 5ff3d38..ab76708 100644 Binary files a/src/plba/__pycache__/__init__.cpython-312.pyc and b/src/plba/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/plba/__pycache__/contracts.cpython-312.pyc b/src/plba/__pycache__/contracts.cpython-312.pyc index cd692e0..e631c93 100644 Binary files a/src/plba/__pycache__/contracts.cpython-312.pyc and b/src/plba/__pycache__/contracts.cpython-312.pyc differ diff --git a/src/plba/__pycache__/tracing.cpython-312.pyc b/src/plba/__pycache__/tracing.cpython-312.pyc index 84a4dfd..d693ce0 100644 Binary files a/src/plba/__pycache__/tracing.cpython-312.pyc and b/src/plba/__pycache__/tracing.cpython-312.pyc differ diff --git a/src/plba/__pycache__/workers.cpython-312.pyc b/src/plba/__pycache__/workers.cpython-312.pyc index 6183a0f..75ee911 100644 Binary files a/src/plba/__pycache__/workers.cpython-312.pyc and b/src/plba/__pycache__/workers.cpython-312.pyc differ diff --git a/src/plba/__pycache__/workflow.cpython-312.pyc b/src/plba/__pycache__/workflow.cpython-312.pyc new file mode 100644 index 0000000..faee905 Binary files /dev/null and b/src/plba/__pycache__/workflow.cpython-312.pyc differ diff --git a/src/plba/contracts.py b/src/plba/contracts.py index e525433..8fa12c1 100644 --- a/src/plba/contracts.py +++ b/src/plba/contracts.py @@ -1,8 +1,6 @@ from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.config import ConfigProvider from app_runtime.contracts.health import HealthContributor -from app_runtime.contracts.queue import TaskQueue -from app_runtime.contracts.tasks import Task, TaskHandler from app_runtime.contracts.trace import ( TraceContext, TraceContextRecord, @@ -15,9 +13,6 @@ __all__ = [ "ApplicationModule", "ConfigProvider", "HealthContributor", - "Task", - "TaskHandler", - "TaskQueue", "TraceContext", "TraceContextRecord", "TraceLogMessage", diff --git a/src/plba/workers.py b/src/plba/workers.py index 69099d4..ca7672b 100644 --- a/src/plba/workers.py +++ b/src/plba/workers.py @@ -1,4 +1,3 @@ -from app_runtime.workers.queue_worker import QueueWorker from app_runtime.workers.supervisor import WorkerSupervisor -__all__ = ["QueueWorker", "WorkerSupervisor"] +__all__ = ["WorkerSupervisor"] diff --git a/src/plba/workflow.py b/src/plba/workflow.py new file mode 100644 index 0000000..4594ccb --- /dev/null +++ b/src/plba/workflow.py @@ -0,0 +1,18 @@ +from app_runtime.workflow.contracts.context import WorkflowContext +from app_runtime.workflow.contracts.result import StepResult +from app_runtime.workflow.contracts.step import WorkflowStep +from app_runtime.workflow.contracts.workflow import WorkflowDefinition, WorkflowNode +from app_runtime.workflow.engine.hooks import WorkflowEngineHooks +from app_runtime.workflow.engine.workflow_engine import WorkflowEngine +from app_runtime.workflow.runtime_factory import WorkflowRuntimeFactory + +__all__ = [ + "StepResult", + "WorkflowContext", + "WorkflowDefinition", + "WorkflowEngine", + "WorkflowEngineHooks", + "WorkflowNode", + "WorkflowRuntimeFactory", + "WorkflowStep", +] diff --git a/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc b/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc index 0031da0..4c133f1 100644 Binary files a/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc and b/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc differ diff --git a/tests/test_runtime.py b/tests/test_runtime.py index b85bc1f..fc5a1a6 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -2,35 +2,36 @@ from __future__ import annotations import asyncio from dataclasses import dataclass, field -from threading import Event, Thread +from threading import Event, Lock, Thread from time import sleep from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.health import HealthContributor -from app_runtime.contracts.tasks import Task, TaskHandler -from app_runtime.contracts.worker import WorkerHealth +from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus from app_runtime.core.registration import ModuleRegistry from app_runtime.core.runtime import RuntimeManager from app_runtime.queue.in_memory import InMemoryTaskQueue from app_runtime.tracing.transport import NoOpTraceTransport -from app_runtime.workers.queue_worker import QueueWorker @dataclass -class CollectingHandler(TaskHandler): +class CollectingRoutine: processed: list[dict[str, object]] = field(default_factory=list) + _done: bool = False - def handle(self, task: Task) -> None: - self.processed.append(task.payload) + def run(self) -> None: + if self._done: + return + self.processed.append({"id": 1}) + self._done = True -class BlockingHandler(TaskHandler): +class BlockingRoutine: def __init__(self, started: Event, release: Event) -> None: self._started = started self._release = release - def handle(self, task: Task) -> None: - del task + def run(self) -> None: self._started.set() self._release.wait(timeout=2.0) @@ -40,37 +41,139 @@ class StaticHealthContributor(HealthContributor): return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"}) +class RoutineWorker(Worker): + def __init__( + self, + name: str, + routine: object, + *, + interval: float = 0.01, + concurrency: int = 1, + critical: bool = True, + ) -> None: + self._name = name + self._routine = routine + self._interval = interval + self._concurrency = concurrency + self._critical = critical + self._threads: list[Thread] = [] + self._stop_requested = Event() + self._force_stop = Event() + self._lock = Lock() + self._started = False + self._in_flight = 0 + self._runs = 0 + self._failures = 0 + self._last_error: str | None = None + + @property + def name(self) -> str: + return self._name + + @property + def critical(self) -> bool: + return self._critical + + def start(self) -> None: + if any(thread.is_alive() for thread in self._threads): + return + self._threads.clear() + self._stop_requested.clear() + self._force_stop.clear() + self._started = True + for index in range(self._concurrency): + thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True) + self._threads.append(thread) + thread.start() + + def stop(self, force: bool = False) -> None: + self._stop_requested.set() + if force: + self._force_stop.set() + + def health(self) -> WorkerHealth: + status = self.status() + if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0: + return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta) + if self._failures > 0: + return WorkerHealth(self.name, "degraded", self.critical, self._last_error, status.meta) + return WorkerHealth(self.name, "ok", self.critical, meta=status.meta) + + def status(self) -> WorkerStatus: + alive_threads = self._alive_threads() + with self._lock: + in_flight = self._in_flight + runs = self._runs + failures = self._failures + detail = self._last_error + if self._started and alive_threads == 0: + state = "stopped" + elif self._stop_requested.is_set(): + state = "stopping" if alive_threads > 0 else "stopped" + elif not self._started: + state = "stopped" + elif in_flight > 0: + state = "busy" + else: + state = "idle" + return WorkerStatus( + name=self.name, + state=state, + in_flight=in_flight, + detail=detail, + meta={"alive_threads": alive_threads, "concurrency": self._concurrency, "runs": runs, "failures": failures}, + ) + + def _run_loop(self) -> None: + while True: + if self._force_stop.is_set() or self._stop_requested.is_set(): + return + with self._lock: + self._in_flight += 1 + try: + self._routine.run() + except Exception as exc: + with self._lock: + self._failures += 1 + self._last_error = str(exc) + else: + with self._lock: + self._runs += 1 + self._last_error = None + finally: + with self._lock: + self._in_flight -= 1 + if self._stop_requested.is_set(): + return + sleep(self._interval) + + def _alive_threads(self) -> int: + return sum(1 for thread in self._threads if thread.is_alive()) + + class ExampleModule(ApplicationModule): def __init__(self) -> None: - self.handler = CollectingHandler() - self.queue = InMemoryTaskQueue() + self.routine = CollectingRoutine() @property def name(self) -> str: return "example" def register(self, registry: ModuleRegistry) -> None: - traces = registry.services.get("traces") - registry.add_queue("incoming", self.queue) - registry.add_handler("collect", self.handler) - self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) - registry.add_worker(QueueWorker("collector", self.queue, self.handler, traces, concurrency=1)) + registry.add_worker(RoutineWorker("collector", self.routine)) registry.add_health_contributor(StaticHealthContributor()) class BlockingModule(ApplicationModule): def __init__(self, started: Event, release: Event) -> None: - self.queue = InMemoryTaskQueue() - self.handler = BlockingHandler(started, release) + self.routine = BlockingRoutine(started, release) @property def name(self) -> str: return "blocking" def register(self, registry: ModuleRegistry) -> None: - traces = registry.services.get("traces") - self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) - registry.add_worker(QueueWorker("blocking-worker", self.queue, self.handler, traces, concurrency=1)) + registry.add_worker(RoutineWorker("blocking-worker", self.routine)) class RecordingTransport(NoOpTraceTransport): @@ -85,7 +188,7 @@ class RecordingTransport(NoOpTraceTransport): self.messages.append(record) -def test_runtime_processes_tasks_and_exposes_status(tmp_path) -> None: +def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None: config_path = tmp_path / "config.yml" config_path.write_text( """ @@ -113,7 +216,7 @@ log: status = runtime.status() runtime.stop() - assert module.handler.processed == [{"id": 1}] + assert module.routine.processed == [{"id": 1}] assert status["modules"] == ["example"] assert status["runtime"]["state"] == "idle" assert status["health"]["status"] == "ok" @@ -146,7 +249,7 @@ def test_trace_service_writes_contexts_and_messages() -> None: transport = RecordingTransport() manager = TraceService(transport=transport) - with manager.open_context(alias="worker", kind="task", attrs={"task": "incoming"}): + with manager.open_context(alias="worker", kind="worker", attrs={"routine": "incoming"}): manager.step("parse") manager.info("started", status="ok", attrs={"attempt": 1}) @@ -202,29 +305,67 @@ def test_http_control_channel_exposes_health_and_actions() -> None: asyncio.run(scenario()) -def test_public_plba_package_exports_runtime_builder(tmp_path) -> None: +def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None: + import plba from plba import ApplicationModule as PublicApplicationModule - from plba import QueueWorker as PublicQueueWorker + from plba import InMemoryTaskQueue + from plba import Worker as PublicWorker + from plba import WorkerHealth as PublicWorkerHealth + from plba import WorkerStatus as PublicWorkerStatus from plba import create_runtime config_path = tmp_path / "config.yml" config_path.write_text("platform: {}\n", encoding="utf-8") + queue = InMemoryTaskQueue[int]() + queue.put(2) + assert queue.get(timeout=0.01) == 2 + queue.task_done() + + class PublicRoutine: + def __init__(self) -> None: + self.runs = 0 + + def run(self) -> None: + if self.runs == 0: + self.runs += 1 + + class PublicWorkerImpl(PublicWorker): + def __init__(self, routine: PublicRoutine) -> None: + self._inner = RoutineWorker("public-worker", routine) + + @property + def name(self) -> str: + return self._inner.name + + @property + def critical(self) -> bool: + return self._inner.critical + + def start(self) -> None: + self._inner.start() + + def stop(self, force: bool = False) -> None: + self._inner.stop(force=force) + + def health(self) -> PublicWorkerHealth: + return self._inner.health() + + def status(self) -> PublicWorkerStatus: + return self._inner.status() + class PublicExampleModule(PublicApplicationModule): @property def name(self) -> str: return "public-example" def register(self, registry: ModuleRegistry) -> None: - queue = InMemoryTaskQueue() - traces = registry.services.get("traces") - handler = CollectingHandler() - queue.publish(Task(name="incoming", payload={"id": 2}, metadata={})) - registry.add_worker(PublicQueueWorker("public-worker", queue, handler, traces)) + registry.add_worker(PublicWorkerImpl(PublicRoutine())) runtime = create_runtime(PublicExampleModule(), config_path=str(config_path)) runtime.start() sleep(0.2) assert runtime.configuration.get() == {"platform": {}} assert runtime.status()["workers"]["registered"] == 1 + assert hasattr(plba, "QueueWorker") is False runtime.stop()