commit de787ce7eeac41705dd39f8abeead27782f4381d Author: zosimovaa Date: Wed Mar 4 10:01:49 2026 +0300 plba diff --git a/Architectural constraints.md b/Architectural constraints.md new file mode 100644 index 0000000..9f40030 --- /dev/null +++ b/Architectural constraints.md @@ -0,0 +1,111 @@ + +**`docs/adr/0001-new-runtime.md`** +```md +# ADR 0001: Create a new runtime project instead of evolving the legacy ConfigManager model + +## Status + +Accepted + +## Context + +The previous generation of the application runtime was centered around a timer-driven execution model: +- a manager loaded configuration +- a worker loop periodically called `execute()` +- applications placed most operational logic behind that entry point + +That model is adequate for simple periodic jobs, but it does not match the direction of the new platform. + +The new platform must support: +- task sources +- queue-driven processing +- multiple parallel workers +- trace propagation across producer/consumer boundaries +- richer lifecycle and status management +- health aggregation +- future admin web interface +- future authentication and user management + +These are platform concerns, not business concerns. + +We also want business applications to describe only business functionality and rely on the runtime for infrastructure behavior. + +## Decision + +We will create a new runtime project instead of implementing a `V3` directly inside the current legacy ConfigManager codebase. + +The new runtime will be built around a platform-oriented model with explicit concepts such as: +- `RuntimeManager` +- `ApplicationModule` +- `TaskSource` +- `TaskQueue` +- `WorkerSupervisor` +- `TaskHandler` +- `TraceService` +- `HealthRegistry` + +The old execute-centered model is treated as a previous-generation design and is not the architectural basis of the new runtime. + +## Rationale + +Creating a new runtime project gives us: +- freedom to design the correct abstractions from the start +- no pressure to preserve legacy contracts +- cleaner boundaries between platform and business logic +- simpler documentation and tests +- lower long-term complexity than mixing old and new models in one codebase + +If we built this as `V3` inside the old project, we would likely inherit: +- compatibility constraints +- mixed abstractions +- transitional adapters +- conceptual confusion between periodic execution and event/queue processing + +The expected long-term cost of such coupling is higher than creating a clean new runtime. + +## Consequences + +### Positive + +- the platform can be modeled cleanly +- business applications can integrate through explicit contracts +- new runtime capabilities can be added without legacy pressure +- mail_order_bot can become the first pilot application on the new runtime + +### Negative + +- some existing capabilities will need to be reintroduced in the new project +- there will be a temporary period with both legacy and new runtime lines +- migration requires explicit planning + +## Initial migration target + +The first application on the new runtime will be `mail_order_bot`. + +Initial runtime design for that application: +- IMAP polling source +- in-memory queue +- parallel workers +- business handler for email processing +- message marked as read only after successful processing + +Later: +- swap IMAP polling source for IMAP IDLE source +- keep the queue and worker model unchanged + +## What we intentionally do not carry over + +We do not keep the old architecture as the central organizing principle: +- no `execute()`-centric application model +- no timer-loop as the main abstraction +- no implicit mixing of lifecycle and business processing + +## Follow-up + +Next design work should define: +- core platform contracts +- package structure +- runtime lifecycle +- queue and worker interfaces +- config model split between platform and application +- pilot integration for `mail_order_bot` diff --git a/Mail Order Bot Migration Plan.md b/Mail Order Bot Migration Plan.md new file mode 100644 index 0000000..c613f16 --- /dev/null +++ b/Mail Order Bot Migration Plan.md @@ -0,0 +1,116 @@ +# Mail Order Bot Migration Plan + +## Purpose + +This document describes how `mail_order_bot` will be adapted to the new runtime as the first pilot business application. + +## Scope + +This is not a full migration specification for all future applications. +It is a practical first use case to validate the runtime architecture. + +## Current model + +The current application flow is tightly coupled: +- the manager checks IMAP +- unread emails are fetched +- emails are processed synchronously +- messages are marked as read after processing + +Polling and processing happen in one execution path. + +## Target model + +The new runtime-based flow should be: + +1. a mail source detects new tasks +2. tasks are published to a queue +3. workers consume tasks in parallel +4. a domain handler processes each email +5. successful tasks lead to `mark_as_read` +6. failed tasks remain retriable + +## Phase 1 + +### Source +- IMAP polling source + +### Queue +- in-memory task queue + +### Workers +- 2 to 4 parallel workers initially + +### Handler +- domain email processing handler built around the current processing logic + +### Delivery semantics +- email is marked as read only after successful processing +- unread state acts as the first safety mechanism against message loss + +## Why in-memory queue is acceptable at first + +For the first phase: +- infrastructure complexity stays low +- the runtime contracts can be tested quickly +- unread emails in IMAP provide a simple recovery path after crashes + +This allows us to validate the runtime architecture before adopting an external broker. + +## Phase 2 + +Replace: +- IMAP polling source + +With: +- IMAP IDLE source + +The queue, workers, and handler should remain unchanged. + +This is an explicit architectural goal: +source replacement without redesigning the processing pipeline. + +## Domain responsibilities that remain inside mail_order_bot + +The runtime should not own: +- email parsing rules +- client resolution logic +- attachment processing rules +- order creation logic +- client-specific behavior + +These remain in the business application. + +## Platform responsibilities used by mail_order_bot + +The new runtime should provide: +- lifecycle +- configuration +- queue abstraction +- worker orchestration +- tracing +- health checks +- status/control APIs + +## Migration boundaries + +### Move into runtime +- source orchestration +- worker supervision +- queue management +- trace provisioning +- health aggregation + +### Keep in mail_order_bot +- email business handler +- mail domain services +- business pipeline +- business-specific config validation beyond platform-level schema + +## Success criteria + +The migration is successful when: +- mail polling is no longer tightly coupled to processing +- workers can process emails in parallel +- business logic is not moved into the runtime +- replacing polling with IDLE is localized to the source layer diff --git a/README.md b/README.md new file mode 100644 index 0000000..5a6c2a7 --- /dev/null +++ b/README.md @@ -0,0 +1,469 @@ +# PLBA + +`PLBA` is a reusable platform runtime for business applications. + +It solves platform concerns that should not live inside domain code: +- application lifecycle +- worker orchestration +- configuration loading from YAML +- tracing +- health aggregation +- runtime status reporting +- HTTP control endpoints +- logging configuration + +Business applications depend on `plba` as a package and implement only their own business behavior. + +## Architecture + +Current PLBA architecture is built around one core idea: +- the runtime manages a set of application workers + +A worker is any runtime-managed active component with a unified lifecycle: +- `start()` +- `stop(force=False)` +- `health()` +- `status()` + +This means PLBA does not require separate platform categories like `source` and `consumer`. +If an application needs polling, queue processing, listening, scheduled work, or another active loop, it is implemented as a worker. + +### Main runtime model + +1. application creates `RuntimeManager` +2. runtime loads configuration +3. runtime applies logging configuration +4. application module registers workers and supporting services +5. runtime starts all workers +6. workers execute business-related loops or processing +7. runtime aggregates health and status +8. runtime stops workers gracefully or forcefully + +## Core concepts + +### `ApplicationModule` + +File: [application.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/application.py) + +Describes a business application to the runtime. + +Responsibilities: +- provide module name +- register workers +- register queues if needed +- register handlers if needed +- register health contributors +- compose application-specific objects + +`ApplicationModule` does not run the application itself. +It only declares how the application is assembled. + +### `Worker` + +File: [worker.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/worker.py) + +The main runtime-managed contract. + +Responsibilities: +- start its own execution +- stop gracefully or forcefully +- report health +- report runtime status + +This is the main extension point for business applications. + +### `TaskQueue` + +File: [queue.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/queue.py) + +Optional queue abstraction. + +Use it when application workers need buffered or decoupled processing. + +PLBA does not force every application to use a queue. +Queue is one supported pattern, not the foundation of the whole platform. + +### `TaskHandler` + +File: [tasks.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/contracts/tasks.py) + +Optional unit of business processing for one task. + +Useful when a worker follows queue-driven logic: +- worker takes a task +- handler executes business logic + +### `TraceService` + +File: [service.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/tracing/service.py) + +Platform trace service. + +Responsibilities: +- create trace contexts +- resume trace from task metadata +- write context records +- write trace messages + +Business code should use it as a platform service and should not implement its own tracing infrastructure. + +### `HealthRegistry` + +File: [registry.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/health/registry.py) + +Aggregates application health. + +PLBA uses three health states: +- `ok` — all critical parts work +- `degraded` — application still works, but there is a problem +- `unhealthy` — application should not be considered operational + +### Runtime status + +File: [types.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/core/types.py) + +Status is separate from health. + +Current runtime states: +- `starting` +- `idle` +- `busy` +- `stopping` +- `stopped` + +Status is used for operational lifecycle decisions such as graceful shutdown. + +### `ControlPlaneService` + +Files: +- [service.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/control/service.py) +- [http_channel.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/control/http_channel.py) + +Provides control and observability endpoints. + +Currently supported: +- health access +- runtime start action +- runtime stop action +- runtime status action + +### `ConfigurationManager` + +Files: +- [configuration.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/core/configuration.py) +- [file_loader.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/config/file_loader.py) +- [providers.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/config/providers.py) + +Loads and merges configuration. + +Current built-in source: +- YAML file provider + +### `LogManager` + +File: [manager.py](/Users/alex/Dev_projects_v2/apps/plba/src/app_runtime/logging/manager.py) + +Applies logging configuration from config. + +Current expectation: +- logging config lives in the `log` section of YAML + +## Available platform services + +PLBA currently provides these reusable services. + +### 1. Runtime lifecycle + +Service: +- `RuntimeManager` + +What it gives: +- startup orchestration +- worker registration and startup +- graceful stop with timeout +- force stop +- status snapshot + +Example use: +- start `mail_order_bot` +- stop it after active email processing is drained + +### 2. Worker supervision + +Service: +- `WorkerSupervisor` + +What it gives: +- unified worker orchestration +- aggregated worker statuses +- aggregated worker health +- stop coordination + +Example use: +- run one polling worker and three processing workers in the same application + +### 3. Queue support + +Services: +- `TaskQueue` +- `InMemoryTaskQueue` +- `QueueWorker` + +What it gives: +- buffered processing +- decoupling between task production and task consumption +- worker concurrency for task handling + +Example use: +- worker A polls IMAP and pushes tasks to queue +- worker B processes queued email tasks with concurrency `3` + +### 4. Configuration + +Services: +- `ConfigurationManager` +- `FileConfigProvider` +- `ConfigFileLoader` + +What it gives: +- YAML config loading +- config merging +- access to platform and application config + +Example use: +- load `platform` section for runtime +- load `mail_order_bot` section for app-specific config + +### 5. Tracing + +Services: +- `TraceService` +- `TraceTransport` +- `NoOpTraceTransport` + +What it gives: +- trace context creation +- trace propagation through task metadata +- trace messages for processing steps + +Example use: +- polling worker creates trace when it discovers a mail +- processing worker resumes trace and writes business steps + +### 6. Health + +Services: +- `HealthRegistry` +- `WorkerHealth` + +What it gives: +- per-worker health +- aggregated application health +- critical vs non-critical component handling + +Example use: +- email processing workers are critical +- optional diagnostic worker may be non-critical + +### 7. Status + +Services: +- `WorkerStatus` +- runtime aggregated state + +What it gives: +- current activity visibility +- ability to stop application only after in-flight work is completed + +Example use: +- stop application only after processing workers become `idle` or `stopped` + +### 8. HTTP control + +Services: +- `ControlPlaneService` +- `HttpControlChannel` + +What it gives: +- HTTP health/status/actions +- operational integration point + +Example use: +- inspect current health from orchestration +- request graceful stop remotely + +## Public package API + +Public namespace is `plba`. + +Main imports for external applications: + +```python +from plba import ApplicationModule, QueueWorker, RuntimeManager, create_runtime +from plba.contracts import Task, TaskHandler, TaskQueue, Worker, WorkerHealth, WorkerStatus +from plba.queue import InMemoryTaskQueue +from plba.tracing import TraceService +``` + +## Example application pattern + +Minimal queue-based application: + +```python +from plba import ApplicationModule, QueueWorker, Task, TaskHandler, create_runtime +from plba.queue import InMemoryTaskQueue + + +class ExampleHandler(TaskHandler): + def handle(self, task: Task) -> None: + print(task.payload) + + +class ExampleModule(ApplicationModule): + @property + def name(self) -> str: + return "example" + + def register(self, registry) -> None: + queue = InMemoryTaskQueue() + traces = registry.services.get("traces") + + queue.publish(Task(name="incoming", payload={"hello": "world"})) + + registry.add_queue("incoming", queue) + registry.add_worker(QueueWorker("example-worker", queue, ExampleHandler(), traces)) + + +runtime = create_runtime( + ExampleModule(), + config_path="config.yml", + enable_http_control=False, +) +runtime.start() +``` + +## Building business applications on PLBA + +These are the current rules for building business applications correctly. + +### 1. Keep platform and business concerns separate + +PLBA owns: +- lifecycle +- worker management +- logging +- trace infrastructure +- health aggregation +- HTTP control +- config loading + +Business application owns: +- business workflows +- domain services +- application-specific config schema +- business task payloads +- business error semantics + +### 2. Build app behavior from workers + +A business application should be described as a small set of workers. + +Typical examples: +- polling worker +- processing worker +- reconciliation worker + +Do not introduce new worker types at platform level unless there is clear need for custom runtime behavior. + +### 3. Use queues only when they help + +Queue is optional. + +Use queue when: +- one worker discovers work +- another worker processes it +- buffering or decoupling helps +- concurrency is needed + +Do not force queue into applications that do not need it. + +### 4. Keep business logic out of worker lifecycle code + +Worker should orchestrate execution. +Business rules should live in dedicated services and handlers. + +Good: +- worker gets config +- worker calls domain service +- worker reports trace and status + +Bad: +- worker contains all parsing, decision logic, integration rules, and persistence rules in one class + +### 5. Use trace as a platform service + +Business application should: +- create meaningful trace steps +- propagate trace through task metadata if queue is used +- record business-relevant processing milestones + +Business application should not: +- implement its own trace store +- control trace transport directly unless explicitly needed + +### 6. Read config through PLBA + +Business application should not read YAML directly. + +Recommended flow: +- PLBA loads config +- application reads only its own config section +- application converts it to typed app config object +- services receive typed config object + +### 7. Distinguish health from status + +Use `health` for: +- is application operational? + +Use `status` for: +- what is application doing right now? + +This is important for graceful stop: +- health may still be `ok` +- status may be `busy` + +### 8. Design workers for graceful stop + +Workers should support: +- stop accepting new work +- finish current in-flight work when possible +- report `busy`, `idle`, `stopping`, `stopped` + +This allows runtime to stop application safely. + +## Recommended repository model + +PLBA is intended to live in its own repository as a reusable package. + +Recommended setup: +- repository `plba`: platform package only +- repository `mail_order_bot`: business application depending on `plba` +- repository `service_b`: business application depending on `plba` + +## Example: `mail_order_bot` + +Simple first version of `mail_order_bot` on PLBA: +- `MailPollingWorker`, concurrency `1` +- `EmailProcessingWorker`, concurrency `3` +- shared `InMemoryTaskQueue` +- domain services for mail parsing and order processing + +Flow: +1. polling worker checks IMAP +2. polling worker pushes email tasks into queue +3. processing workers consume tasks +4. processing workers execute domain logic +5. runtime aggregates health and status + +This keeps `mail_order_bot` small, explicit, and aligned with current PLBA architecture. diff --git a/architecture.md b/architecture.md new file mode 100644 index 0000000..b0ddba0 --- /dev/null +++ b/architecture.md @@ -0,0 +1,248 @@ +# Architecture + +## Overview + +The runtime is built as a platform layer for business applications. + +It consists of four logical layers: +- platform core +- platform contracts +- infrastructure adapters +- business applications + +## Layers + +### Platform core + +The core contains long-lived runtime services: +- `RuntimeManager` +- `ConfigurationManager` +- `WorkerSupervisor` +- `TraceService` +- `HealthRegistry` +- `ControlPlaneService` +- `ServiceContainer` + +The core is responsible for orchestration, not domain behavior. + +### Platform contracts + +Contracts define how business applications integrate with the runtime. + +Main contracts: +- `ApplicationModule` +- `TaskSource` +- `TaskQueue` +- `Worker` +- `TaskHandler` +- `ConfigProvider` +- `HealthContributor` +- `TraceFactory` + +These contracts must remain domain-agnostic. + +### Infrastructure adapters + +Adapters implement concrete runtime capabilities: +- in-memory queue +- Redis queue +- file config loader +- database config loader +- polling source +- IMAP IDLE source +- HTTP control plane +- trace transport adapters + +Adapters may change between applications and deployments. + +### Business applications + +Applications are built on top of the contracts and adapters. + +Examples: +- `mail_order_bot` +- future event-driven business services + +Applications contain: +- domain models +- domain handlers +- application-specific configuration schema +- source/handler composition + +## Core runtime components + +### RuntimeManager + +The main platform facade. + +Responsibilities: +- bootstrap runtime +- initialize services +- register application modules +- start and stop all runtime-managed components +- expose status +- coordinate graceful shutdown + +### ConfigurationManager + +Responsibilities: +- load configuration +- validate configuration +- publish config updates +- provide typed config access +- notify subscribers on reload + +Configuration should be divided into: +- platform config +- application config +- environment/runtime overrides + +### WorkerSupervisor + +Responsibilities: +- register worker definitions +- start worker pools +- monitor worker health +- restart failed workers when appropriate +- manage parallelism and backpressure +- expose worker-level status + +### TraceService + +Responsibilities: +- create traces for operations +- propagate trace context across source -> queue -> worker -> handler boundaries +- provide trace factories to applications +- remain transport-agnostic + +### HealthRegistry + +Responsibilities: +- collect health from registered contributors +- aggregate health into liveness/readiness/status views +- expose structured runtime health + +### ControlPlaneService + +Responsibilities: +- control endpoints +- runtime state visibility +- administrative actions +- later authentication and user/session-aware access + +## Main runtime model + +The runtime should operate on this conceptual flow: + +1. runtime starts +2. configuration is loaded +3. services are initialized +4. application modules register sources, queues, handlers, and workers +5. task sources start producing tasks +6. tasks are published into queues +7. workers consume tasks +8. handlers execute business logic +9. traces and health are updated throughout the flow +10. runtime stops gracefully on request + +## Contracts + +### ApplicationModule + +Describes a business application to the runtime. + +Responsibilities: +- register domain services +- register task sources +- register queues +- register worker pools +- register handlers +- declare config requirements +- optionally register health contributors + +### TaskSource + +Produces tasks into queues. + +Examples: +- IMAP polling source +- IMAP IDLE source +- webhook source +- scheduled source + +Responsibilities: +- start +- stop +- publish tasks +- expose source status + +### TaskQueue + +A queue abstraction. + +Expected operations: +- `publish(task)` +- `consume()` +- `ack(task)` +- `nack(task, retry_delay=None)` +- `stats()` + +The first implementation may be in-memory, but the interface should support future backends. + +### Worker + +Consumes tasks from a queue and passes them to a handler. + +Responsibilities: +- obtain task from queue +- open or resume trace context +- call business handler +- ack or nack the task +- expose worker state + +### TaskHandler + +Executes business logic for one task. + +The runtime should not know what the handler does. +It only knows that a task is processed. + +## Mail Order Bot as first application + +### Phase 1 + +- source: IMAP polling +- queue: in-memory queue +- workers: parallel email processing workers +- handler: domain email processing handler +- mark message as read only after successful processing + +### Phase 2 + +- source changes from polling to IMAP IDLE +- queue and workers remain the same + +This demonstrates one of the architectural goals: +the source can change without redesigning the rest of the processing pipeline. + +## Suggested package structure + +```text +src/ + app_runtime/ + core/ + contracts/ + config/ + workers/ + queue/ + tracing/ + health/ + control/ + container/ + adapters/ + mail_order_bot_app/ + module/ + sources/ + handlers/ + services/ + domain/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5af9178 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "plba" +version = "0.1.0" +description = "Platform runtime for business applications" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.129.0", + "PyYAML>=6.0.3", + "uvicorn>=0.41.0", +] + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +pythonpath = ["src"] +testpaths = ["tests"] diff --git a/src/app_runtime/__init__.py b/src/app_runtime/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..98a9fc9 Binary files /dev/null and b/src/app_runtime/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/config/__init__.py b/src/app_runtime/config/__init__.py new file mode 100644 index 0000000..eab5d39 --- /dev/null +++ b/src/app_runtime/config/__init__.py @@ -0,0 +1,4 @@ +from app_runtime.config.file_loader import ConfigFileLoader +from app_runtime.config.providers import FileConfigProvider + +__all__ = ["ConfigFileLoader", "FileConfigProvider"] diff --git a/src/app_runtime/config/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/config/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..284a83a Binary files /dev/null and b/src/app_runtime/config/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/config/__pycache__/file_loader.cpython-312.pyc b/src/app_runtime/config/__pycache__/file_loader.cpython-312.pyc new file mode 100644 index 0000000..90c0617 Binary files /dev/null and b/src/app_runtime/config/__pycache__/file_loader.cpython-312.pyc differ diff --git a/src/app_runtime/config/__pycache__/providers.cpython-312.pyc b/src/app_runtime/config/__pycache__/providers.cpython-312.pyc new file mode 100644 index 0000000..8d33cb5 Binary files /dev/null and b/src/app_runtime/config/__pycache__/providers.cpython-312.pyc differ diff --git a/src/app_runtime/config/file_loader.py b/src/app_runtime/config/file_loader.py new file mode 100644 index 0000000..59919b2 --- /dev/null +++ b/src/app_runtime/config/file_loader.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import hashlib +import json +from pathlib import Path +from typing import Any + +import yaml + + +class ConfigFileLoader: + def __init__(self, path: str | Path) -> None: + self.path = Path(path) + self.config: Any = None + self.last_valid_config: Any = None + self._last_seen_hash: str | None = None + + def read_text(self) -> str: + return self.path.read_text(encoding="utf-8") + + def parse(self, data: str) -> Any: + suffix = self.path.suffix.lower() + if suffix in {".yml", ".yaml"}: + return yaml.safe_load(data) + return json.loads(data) + + def load_sync(self) -> Any: + data = self.read_text() + parsed = self.parse(data) + self.config = parsed + self.last_valid_config = parsed + self._last_seen_hash = self._calculate_hash(data) + return parsed + + def load_if_changed(self) -> tuple[bool, Any]: + data = self.read_text() + current_hash = self._calculate_hash(data) + if current_hash == self._last_seen_hash: + return False, self.config + parsed = self.parse(data) + self.config = parsed + self.last_valid_config = parsed + self._last_seen_hash = current_hash + return True, parsed + + @staticmethod + def _calculate_hash(data: str) -> str: + return hashlib.sha256(data.encode("utf-8")).hexdigest() diff --git a/src/app_runtime/config/providers.py b/src/app_runtime/config/providers.py new file mode 100644 index 0000000..5f00e23 --- /dev/null +++ b/src/app_runtime/config/providers.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from app_runtime.config.file_loader import ConfigFileLoader +from app_runtime.contracts.config import ConfigProvider + + +class FileConfigProvider(ConfigProvider): + def __init__(self, path: str | Path) -> None: + self._loader = ConfigFileLoader(path) + + @property + def loader(self) -> ConfigFileLoader: + return self._loader + + def load(self) -> dict[str, Any]: + config = self._loader.load_sync() + if not isinstance(config, dict): + raise TypeError("Config root must be a mapping") + return dict(config) diff --git a/src/app_runtime/contracts/__init__.py b/src/app_runtime/contracts/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/contracts/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/contracts/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..6a0d031 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc new file mode 100644 index 0000000..35ebf3e Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/application.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/config.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..3dd32ce Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/config.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/health.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/health.cpython-312.pyc new file mode 100644 index 0000000..76ab161 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/health.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/queue.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/queue.cpython-312.pyc new file mode 100644 index 0000000..f91ff16 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/queue.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/runner.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/runner.cpython-312.pyc new file mode 100644 index 0000000..79e3067 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/runner.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/tasks.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/tasks.cpython-312.pyc new file mode 100644 index 0000000..74b93f7 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/tasks.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/trace.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/trace.cpython-312.pyc new file mode 100644 index 0000000..5ae3878 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/trace.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/__pycache__/worker.cpython-312.pyc b/src/app_runtime/contracts/__pycache__/worker.cpython-312.pyc new file mode 100644 index 0000000..c698272 Binary files /dev/null and b/src/app_runtime/contracts/__pycache__/worker.cpython-312.pyc differ diff --git a/src/app_runtime/contracts/application.py b/src/app_runtime/contracts/application.py new file mode 100644 index 0000000..82441f6 --- /dev/null +++ b/src/app_runtime/contracts/application.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + +from app_runtime.core.registration import ModuleRegistry + + +class ApplicationModule(ABC): + @property + @abstractmethod + def name(self) -> str: + """Module name used for runtime status and diagnostics.""" + + @abstractmethod + def register(self, registry: ModuleRegistry) -> None: + """Register workers, queues, handlers, services, and health contributors.""" diff --git a/src/app_runtime/contracts/config.py b/src/app_runtime/contracts/config.py new file mode 100644 index 0000000..1272984 --- /dev/null +++ b/src/app_runtime/contracts/config.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + + +class ConfigProvider(ABC): + @abstractmethod + def load(self) -> dict[str, Any]: + """Return a config fragment.""" diff --git a/src/app_runtime/contracts/health.py b/src/app_runtime/contracts/health.py new file mode 100644 index 0000000..2ae01d3 --- /dev/null +++ b/src/app_runtime/contracts/health.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from app_runtime.contracts.worker import WorkerHealth + + +class HealthContributor(ABC): + @abstractmethod + def health(self) -> WorkerHealth: + """Return contributor health state.""" diff --git a/src/app_runtime/contracts/queue.py b/src/app_runtime/contracts/queue.py new file mode 100644 index 0000000..498796d --- /dev/null +++ b/src/app_runtime/contracts/queue.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +from app_runtime.contracts.tasks import Task + + +class TaskQueue(ABC): + @abstractmethod + def publish(self, task: Task) -> None: + """Push a task into the queue.""" + + @abstractmethod + def consume(self, timeout: float = 0.1) -> Task | None: + """Return the next available task or None.""" + + @abstractmethod + def ack(self, task: Task) -> None: + """Confirm successful task processing.""" + + @abstractmethod + def nack(self, task: Task, retry_delay: float | None = None) -> None: + """Signal failed task processing.""" + + @abstractmethod + def stats(self) -> dict[str, Any]: + """Return transport-level queue statistics.""" diff --git a/src/app_runtime/contracts/tasks.py b/src/app_runtime/contracts/tasks.py new file mode 100644 index 0000000..7f552ba --- /dev/null +++ b/src/app_runtime/contracts/tasks.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(slots=True) +class Task: + name: str + payload: dict[str, Any] + metadata: dict[str, Any] = field(default_factory=dict) + + +class TaskHandler(ABC): + @abstractmethod + def handle(self, task: Task) -> None: + """Execute domain logic for a task.""" diff --git a/src/app_runtime/contracts/trace.py b/src/app_runtime/contracts/trace.py new file mode 100644 index 0000000..e9c8fee --- /dev/null +++ b/src/app_runtime/contracts/trace.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Protocol + + +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +@dataclass(slots=True) +class TraceContext: + trace_id: str + span_id: str + parent_span_id: str | None = None + attributes: dict[str, Any] | None = None + + +class TraceContextFactory(ABC): + @abstractmethod + def new_root(self, operation: str) -> TraceContext: + """Create a new root trace context.""" + + @abstractmethod + def child_of(self, parent: TraceContext, operation: str) -> TraceContext: + """Create a child trace context.""" + + +@dataclass(frozen=True) +class TraceContextRecord: + trace_id: str + alias: str + parent_id: str | None = None + type: str | None = None + event_time: datetime = field(default_factory=utc_now) + attrs: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TraceLogMessage: + trace_id: str + step: str + status: str + message: str + level: str + event_time: datetime = field(default_factory=utc_now) + attrs: dict[str, Any] = field(default_factory=dict) + + +class TraceTransport(Protocol): + def write_context(self, record: TraceContextRecord) -> None: + """Persist trace context record.""" + + def write_message(self, record: TraceLogMessage) -> None: + """Persist trace log message.""" diff --git a/src/app_runtime/contracts/worker.py b/src/app_runtime/contracts/worker.py new file mode 100644 index 0000000..ade5449 --- /dev/null +++ b/src/app_runtime/contracts/worker.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Literal + + +HealthState = Literal["ok", "degraded", "unhealthy"] +WorkerState = Literal["starting", "idle", "busy", "stopping", "stopped"] + + +@dataclass(slots=True) +class WorkerHealth: + name: str + status: HealthState + critical: bool + detail: str | None = None + meta: dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class WorkerStatus: + name: str + state: WorkerState + in_flight: int = 0 + detail: str | None = None + meta: dict[str, Any] = field(default_factory=dict) + + +class Worker(ABC): + @property + @abstractmethod + def name(self) -> str: + """Stable worker name for diagnostics.""" + + @property + @abstractmethod + def critical(self) -> bool: + """Whether this worker is required for healthy app operation.""" + + @abstractmethod + def start(self) -> None: + """Start worker execution.""" + + @abstractmethod + def stop(self, force: bool = False) -> None: + """Request graceful or immediate stop.""" + + @abstractmethod + def health(self) -> WorkerHealth: + """Return current health state.""" + + @abstractmethod + def status(self) -> WorkerStatus: + """Return current runtime status.""" diff --git a/src/app_runtime/control/__init__.py b/src/app_runtime/control/__init__.py new file mode 100644 index 0000000..09b35fe --- /dev/null +++ b/src/app_runtime/control/__init__.py @@ -0,0 +1,5 @@ +from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.http_channel import HttpControlChannel +from app_runtime.control.service import ControlPlaneService + +__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"] diff --git a/src/app_runtime/control/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/control/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..0ba4dc5 Binary files /dev/null and b/src/app_runtime/control/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/control/__pycache__/base.cpython-312.pyc b/src/app_runtime/control/__pycache__/base.cpython-312.pyc new file mode 100644 index 0000000..6088810 Binary files /dev/null and b/src/app_runtime/control/__pycache__/base.cpython-312.pyc differ diff --git a/src/app_runtime/control/__pycache__/http_app.cpython-312.pyc b/src/app_runtime/control/__pycache__/http_app.cpython-312.pyc new file mode 100644 index 0000000..8d093d0 Binary files /dev/null and b/src/app_runtime/control/__pycache__/http_app.cpython-312.pyc differ diff --git a/src/app_runtime/control/__pycache__/http_channel.cpython-312.pyc b/src/app_runtime/control/__pycache__/http_channel.cpython-312.pyc new file mode 100644 index 0000000..4eab584 Binary files /dev/null and b/src/app_runtime/control/__pycache__/http_channel.cpython-312.pyc differ diff --git a/src/app_runtime/control/__pycache__/http_runner.cpython-312.pyc b/src/app_runtime/control/__pycache__/http_runner.cpython-312.pyc new file mode 100644 index 0000000..2cbd3b1 Binary files /dev/null and b/src/app_runtime/control/__pycache__/http_runner.cpython-312.pyc differ diff --git a/src/app_runtime/control/__pycache__/service.cpython-312.pyc b/src/app_runtime/control/__pycache__/service.cpython-312.pyc new file mode 100644 index 0000000..9fced0b Binary files /dev/null and b/src/app_runtime/control/__pycache__/service.cpython-312.pyc differ diff --git a/src/app_runtime/control/base.py b/src/app_runtime/control/base.py new file mode 100644 index 0000000..06e9566 --- /dev/null +++ b/src/app_runtime/control/base.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from app_runtime.core.types import HealthPayload + +ActionHandler = Callable[[], Awaitable[str]] +HealthHandler = Callable[[], Awaitable[HealthPayload]] + + +@dataclass(slots=True) +class ControlActionSet: + health: HealthHandler + start: ActionHandler + stop: ActionHandler + status: ActionHandler + + +class ControlChannel(ABC): + @abstractmethod + async def start(self, actions: ControlActionSet) -> None: + """Start the control channel and bind handlers.""" + + @abstractmethod + async def stop(self) -> None: + """Stop the control channel and release resources.""" diff --git a/src/app_runtime/control/http_app.py b/src/app_runtime/control/http_app.py new file mode 100644 index 0000000..70aa5b1 --- /dev/null +++ b/src/app_runtime/control/http_app.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import time +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +from app_runtime.core.types import HealthPayload + + +class HttpControlAppFactory: + def create( + self, + health_provider: Callable[[], Awaitable[HealthPayload]], + action_provider: Callable[[str], Awaitable[JSONResponse]], + ) -> FastAPI: + app = FastAPI(title="PLBA Control API") + + @app.middleware("http") + async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def] + started = time.monotonic() + response = await call_next(request) + response.headers["X-Response-Time-Ms"] = str(int((time.monotonic() - started) * 1000)) + return response + + @app.get("/health") + async def health() -> JSONResponse: + payload = await health_provider() + status_code = 200 if payload.get("status") == "ok" else 503 + return JSONResponse(content=payload, status_code=status_code) + + @app.get("/actions/{action}") + @app.post("/actions/{action}") + async def action(action: str) -> JSONResponse: + return await action_provider(action) + + return app diff --git a/src/app_runtime/control/http_channel.py b/src/app_runtime/control/http_channel.py new file mode 100644 index 0000000..9d5a374 --- /dev/null +++ b/src/app_runtime/control/http_channel.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import asyncio + +from fastapi.responses import JSONResponse + +from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.http_app import HttpControlAppFactory +from app_runtime.control.http_runner import UvicornThreadRunner + + +class HttpControlChannel(ControlChannel): + def __init__(self, host: str, port: int, timeout: int) -> None: + self._timeout = timeout + self._runner = UvicornThreadRunner(host, port, timeout) + self._factory = HttpControlAppFactory() + self._actions: ControlActionSet | None = None + + async def start(self, actions: ControlActionSet) -> None: + self._actions = actions + app = self._factory.create(self._health_response, self._action_response) + await self._runner.start(app) + + async def stop(self) -> None: + await self._runner.stop() + + @property + def port(self) -> int: + return self._runner.port + + async def _health_response(self) -> dict[str, object]: + if self._actions is None: + return {"status": "unhealthy", "detail": "control actions are not configured"} + return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout)) + + async def _action_response(self, action: str) -> JSONResponse: + if self._actions is None: + return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) + callbacks = { + "start": self._actions.start, + "stop": self._actions.stop, + "status": self._actions.status, + } + callback = callbacks.get(action) + if callback is None: + return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) + try: + detail = await asyncio.wait_for(callback(), timeout=float(self._timeout)) + except asyncio.TimeoutError: + return JSONResponse(content={"status": "error", "detail": f"{action} handler timeout"}, status_code=504) + except Exception as exc: + return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) + return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) diff --git a/src/app_runtime/control/http_runner.py b/src/app_runtime/control/http_runner.py new file mode 100644 index 0000000..bca3afd --- /dev/null +++ b/src/app_runtime/control/http_runner.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import asyncio +from threading import Thread + +from fastapi import FastAPI +from uvicorn import Config, Server + + +class UvicornThreadRunner: + def __init__(self, host: str, port: int, timeout: int) -> None: + self._host = host + self._port = port + self._timeout = timeout + self._server: Server | None = None + self._thread: Thread | None = None + self._error: BaseException | None = None + + async def start(self, app: FastAPI) -> None: + if self._thread is not None and self._thread.is_alive(): + return + self._error = None + config = Config(app=app, host=self._host, port=self._port, log_level="warning") + self._server = Server(config) + self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True) + self._thread.start() + await self._wait_until_started() + + async def stop(self) -> None: + if self._server is None or self._thread is None: + return + self._server.should_exit = True + await asyncio.to_thread(self._thread.join, self._timeout) + self._server = None + self._thread = None + + @property + def port(self) -> int: + if self._server is None or not getattr(self._server, "servers", None): + return self._port + socket = self._server.servers[0].sockets[0] + return int(socket.getsockname()[1]) + + async def _wait_until_started(self) -> None: + if self._server is None: + raise RuntimeError("Server is not initialized") + deadline = asyncio.get_running_loop().time() + max(float(self._timeout), 1.0) + while not self._server.started: + if self._error is not None: + raise RuntimeError("HTTP control server failed to start") from self._error + if asyncio.get_running_loop().time() >= deadline: + raise TimeoutError("HTTP control server startup timed out") + await asyncio.sleep(0.05) + + def _serve(self) -> None: + if self._server is None: + return + try: + asyncio.run(self._server.serve()) + except BaseException as exc: # noqa: BLE001 + self._error = exc diff --git a/src/app_runtime/control/service.py b/src/app_runtime/control/service.py new file mode 100644 index 0000000..821e6c9 --- /dev/null +++ b/src/app_runtime/control/service.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +from app_runtime.control.base import ControlActionSet, ControlChannel + +if TYPE_CHECKING: + from app_runtime.core.runtime import RuntimeManager + + +class ControlPlaneService: + def __init__(self) -> None: + self._channels: list[ControlChannel] = [] + + def register_channel(self, channel: ControlChannel) -> None: + self._channels.append(channel) + + def start(self, runtime: RuntimeManager) -> None: + if not self._channels: + return + asyncio.run(self._start_async(runtime)) + + def stop(self) -> None: + if not self._channels: + return + asyncio.run(self._stop_async()) + + def snapshot(self, runtime: RuntimeManager) -> dict[str, object]: + health = runtime.current_health() + return { + "runtime": {"state": runtime._state.value}, + "modules": list(runtime.registry.modules), + "services": runtime.services.snapshot(), + "workers": runtime.workers.snapshot(), + "health": runtime.health.snapshot(runtime.workers.healths()) | {"status": health["status"]}, + "config": runtime.configuration.get(), + } + + async def _start_async(self, runtime: RuntimeManager) -> None: + actions = ControlActionSet( + health=runtime.health_status, + start=runtime.start_runtime, + stop=runtime.stop_runtime, + status=runtime.runtime_status, + ) + for channel in self._channels: + await channel.start(actions) + + async def _stop_async(self) -> None: + for channel in reversed(self._channels): + await channel.stop() diff --git a/src/app_runtime/core/__init__.py b/src/app_runtime/core/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app_runtime/core/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app_runtime/core/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/core/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..14ee785 Binary files /dev/null and b/src/app_runtime/core/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/core/__pycache__/configuration.cpython-312.pyc b/src/app_runtime/core/__pycache__/configuration.cpython-312.pyc new file mode 100644 index 0000000..9fd1f17 Binary files /dev/null and b/src/app_runtime/core/__pycache__/configuration.cpython-312.pyc differ diff --git a/src/app_runtime/core/__pycache__/registration.cpython-312.pyc b/src/app_runtime/core/__pycache__/registration.cpython-312.pyc new file mode 100644 index 0000000..ef75b12 Binary files /dev/null and b/src/app_runtime/core/__pycache__/registration.cpython-312.pyc differ diff --git a/src/app_runtime/core/__pycache__/runtime.cpython-312.pyc b/src/app_runtime/core/__pycache__/runtime.cpython-312.pyc new file mode 100644 index 0000000..eefe9c0 Binary files /dev/null and b/src/app_runtime/core/__pycache__/runtime.cpython-312.pyc differ diff --git a/src/app_runtime/core/__pycache__/service_container.cpython-312.pyc b/src/app_runtime/core/__pycache__/service_container.cpython-312.pyc new file mode 100644 index 0000000..4a66a8e Binary files /dev/null and b/src/app_runtime/core/__pycache__/service_container.cpython-312.pyc differ diff --git a/src/app_runtime/core/__pycache__/types.cpython-312.pyc b/src/app_runtime/core/__pycache__/types.cpython-312.pyc new file mode 100644 index 0000000..8204dbd Binary files /dev/null and b/src/app_runtime/core/__pycache__/types.cpython-312.pyc differ diff --git a/src/app_runtime/core/configuration.py b/src/app_runtime/core/configuration.py new file mode 100644 index 0000000..1be970d --- /dev/null +++ b/src/app_runtime/core/configuration.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from app_runtime.contracts.config import ConfigProvider + + +class ConfigurationManager: + def __init__(self) -> None: + self._providers: list[ConfigProvider] = [] + self._subscribers: list[Callable[[dict[str, Any]], None]] = [] + self._config: dict[str, Any] = {} + + def add_provider(self, provider: ConfigProvider) -> None: + self._providers.append(provider) + + def subscribe(self, callback: Callable[[dict[str, Any]], None]) -> None: + self._subscribers.append(callback) + + def load(self) -> dict[str, Any]: + merged: dict[str, Any] = {} + for provider in self._providers: + merged = self._deep_merge(merged, provider.load()) + self._config = merged + return dict(self._config) + + def reload(self) -> dict[str, Any]: + config = self.load() + for callback in self._subscribers: + callback(dict(config)) + return config + + def get(self) -> dict[str, Any]: + return dict(self._config) + + def section(self, name: str, default: Any = None) -> Any: + return self._config.get(name, default) + + def _deep_merge(self, left: dict[str, Any], right: dict[str, Any]) -> dict[str, Any]: + merged = dict(left) + for key, value in right.items(): + current = merged.get(key) + if isinstance(current, dict) and isinstance(value, dict): + merged[key] = self._deep_merge(current, value) + continue + merged[key] = value + return merged diff --git a/src/app_runtime/core/registration.py b/src/app_runtime/core/registration.py new file mode 100644 index 0000000..9be2a1e --- /dev/null +++ b/src/app_runtime/core/registration.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from app_runtime.contracts.health import HealthContributor +from app_runtime.contracts.queue import TaskQueue +from app_runtime.contracts.tasks import TaskHandler +from app_runtime.contracts.worker import Worker +from app_runtime.core.service_container import ServiceContainer + + +class ModuleRegistry: + def __init__(self, services: ServiceContainer) -> None: + self.services = services + self.queues: dict[str, TaskQueue] = {} + self.handlers: dict[str, TaskHandler] = {} + self.workers: list[Worker] = [] + self.health_contributors: list[HealthContributor] = [] + self.modules: list[str] = [] + + def register_module(self, name: str) -> None: + self.modules.append(name) + + def add_queue(self, name: str, queue: TaskQueue) -> None: + self.queues[name] = queue + + def add_handler(self, name: str, handler: TaskHandler) -> None: + self.handlers[name] = handler + + def add_worker(self, worker: Worker) -> None: + self.workers.append(worker) + + def add_health_contributor(self, contributor: HealthContributor) -> None: + self.health_contributors.append(contributor) diff --git a/src/app_runtime/core/runtime.py b/src/app_runtime/core/runtime.py new file mode 100644 index 0000000..aaad5e0 --- /dev/null +++ b/src/app_runtime/core/runtime.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from app_runtime.config.providers import FileConfigProvider +from app_runtime.contracts.application import ApplicationModule +from app_runtime.control.service import ControlPlaneService +from app_runtime.core.configuration import ConfigurationManager +from app_runtime.core.registration import ModuleRegistry +from app_runtime.core.service_container import ServiceContainer +from app_runtime.core.types import HealthPayload, LifecycleState +from app_runtime.health.registry import HealthRegistry +from app_runtime.logging.manager import LogManager +from app_runtime.tracing.service import TraceService +from app_runtime.workers.supervisor import WorkerSupervisor + + +class RuntimeManager: + def __init__( + self, + configuration: ConfigurationManager | None = None, + services: ServiceContainer | None = None, + traces: TraceService | None = None, + health: HealthRegistry | None = None, + logs: LogManager | None = None, + workers: WorkerSupervisor | None = None, + control_plane: ControlPlaneService | None = None, + ) -> None: + self.configuration = configuration or ConfigurationManager() + self.services = services or ServiceContainer() + self.traces = traces or TraceService() + self.health = health or HealthRegistry() + self.logs = logs or LogManager() + self.workers = workers or WorkerSupervisor() + self.control_plane = control_plane or ControlPlaneService() + self.registry = ModuleRegistry(self.services) + self._started = False + self._state = LifecycleState.IDLE + self._core_registered = False + self._workers_registered = False + self._register_core_services() + + def register_module(self, module: ApplicationModule) -> None: + self.registry.register_module(module.name) + module.register(self.registry) + + def add_config_file(self, path: str) -> FileConfigProvider: + provider = FileConfigProvider(path) + self.configuration.add_provider(provider) + return provider + + def start(self) -> None: + if self._started: + return + self._state = LifecycleState.STARTING + config = self.configuration.load() + self.logs.apply_config(config) + self._register_health_contributors() + self._register_workers() + self.workers.start() + self.control_plane.start(self) + self._started = True + self._refresh_state() + + def stop(self, timeout: float = 30.0, force: bool = False, stop_control_plane: bool = True) -> None: + if not self._started: + return + self._state = LifecycleState.STOPPING + self.workers.stop(timeout=timeout, force=force) + if stop_control_plane: + self.control_plane.stop() + self._started = False + self._state = LifecycleState.STOPPED + + def status(self) -> dict[str, object]: + self._refresh_state() + return self.control_plane.snapshot(self) + + def current_health(self) -> HealthPayload: + self._refresh_state() + return self.health.payload(self._state, self.workers.healths()) + + async def health_status(self) -> HealthPayload: + return self.current_health() + + async def start_runtime(self) -> str: + if self._started: + return "runtime already running" + self.start() + return "runtime started" + + async def stop_runtime(self) -> str: + if not self._started: + return "runtime already stopped" + self.stop(stop_control_plane=False) + return "runtime stopped" + + async def runtime_status(self) -> str: + self._refresh_state() + return self._state.value + + def _register_core_services(self) -> None: + if self._core_registered: + return + self.services.register("configuration", self.configuration) + self.services.register("traces", self.traces) + self.services.register("health", self.health) + self.services.register("logs", self.logs) + self.services.register("workers", self.workers) + self.services.register("control_plane", self.control_plane) + self._core_registered = True + + def _register_health_contributors(self) -> None: + for contributor in self.registry.health_contributors: + self.health.register(contributor) + + def _register_workers(self) -> None: + if self._workers_registered: + return + for worker in self.registry.workers: + self.workers.register(worker) + self._workers_registered = True + + def _refresh_state(self) -> None: + if not self._started or self._state == LifecycleState.STOPPING: + return + self._state = self.workers.lifecycle_state() diff --git a/src/app_runtime/core/service_container.py b/src/app_runtime/core/service_container.py new file mode 100644 index 0000000..401ced3 --- /dev/null +++ b/src/app_runtime/core/service_container.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Any + + +class ServiceContainer: + def __init__(self) -> None: + self._services: dict[str, Any] = {} + + def register(self, name: str, service: Any) -> None: + if name in self._services: + raise ValueError(f"Service '{name}' is already registered") + self._services[name] = service + + def get(self, name: str) -> Any: + try: + return self._services[name] + except KeyError as exc: + raise KeyError(f"Service '{name}' is not registered") from exc + + def require(self, name: str, expected_type: type[Any]) -> Any: + service = self.get(name) + if not isinstance(service, expected_type): + raise TypeError(f"Service '{name}' is not of type {expected_type.__name__}") + return service + + def snapshot(self) -> dict[str, str]: + return {name: type(service).__name__ for name, service in self._services.items()} diff --git a/src/app_runtime/core/types.py b/src/app_runtime/core/types.py new file mode 100644 index 0000000..2654e9d --- /dev/null +++ b/src/app_runtime/core/types.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from enum import Enum +from typing import TypedDict + + +class HealthPayload(TypedDict, total=False): + status: str + detail: str + state: str + components: list[dict[str, object]] + + +class LifecycleState(str, Enum): + STARTING = "starting" + IDLE = "idle" + BUSY = "busy" + STOPPING = "stopping" + STOPPED = "stopped" diff --git a/src/app_runtime/health/__init__.py b/src/app_runtime/health/__init__.py new file mode 100644 index 0000000..2f252fa --- /dev/null +++ b/src/app_runtime/health/__init__.py @@ -0,0 +1,3 @@ +from app_runtime.health.registry import HealthRegistry + +__all__ = ["HealthRegistry"] diff --git a/src/app_runtime/health/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/health/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..e00a792 Binary files /dev/null and b/src/app_runtime/health/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/health/__pycache__/registry.cpython-312.pyc b/src/app_runtime/health/__pycache__/registry.cpython-312.pyc new file mode 100644 index 0000000..acf3b12 Binary files /dev/null and b/src/app_runtime/health/__pycache__/registry.cpython-312.pyc differ diff --git a/src/app_runtime/health/registry.py b/src/app_runtime/health/registry.py new file mode 100644 index 0000000..280b0aa --- /dev/null +++ b/src/app_runtime/health/registry.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from app_runtime.contracts.health import HealthContributor +from app_runtime.contracts.worker import WorkerHealth +from app_runtime.core.types import HealthPayload, LifecycleState + + +class HealthRegistry: + def __init__(self) -> None: + self._contributors: list[HealthContributor] = [] + + def register(self, contributor: HealthContributor) -> None: + self._contributors.append(contributor) + + def contributor_healths(self) -> list[WorkerHealth]: + return [contributor.health() for contributor in self._contributors] + + def snapshot(self, worker_healths: list[WorkerHealth]) -> dict[str, object]: + component_healths = worker_healths + self.contributor_healths() + return { + "status": self._aggregate_status(component_healths), + "components": [self._health_dict(item) for item in component_healths], + } + + def payload(self, state: LifecycleState, worker_healths: list[WorkerHealth]) -> HealthPayload: + component_healths = worker_healths + self.contributor_healths() + if state == LifecycleState.STOPPED: + return {"status": "unhealthy", "detail": "state=stopped", "state": state.value} + if state in {LifecycleState.STARTING, LifecycleState.STOPPING}: + return { + "status": "degraded", + "detail": f"state={state.value}", + "state": state.value, + "components": [self._health_dict(item) for item in component_healths], + } + return { + "status": self._aggregate_status(component_healths), + "state": state.value, + "components": [self._health_dict(item) for item in component_healths], + } + + def _aggregate_status(self, component_healths: list[WorkerHealth]) -> str: + if any(item.status == "unhealthy" and item.critical for item in component_healths): + return "unhealthy" + if any(item.status in {"degraded", "unhealthy"} for item in component_healths): + return "degraded" + return "ok" + + def _health_dict(self, health: WorkerHealth) -> dict[str, object]: + return { + "name": health.name, + "status": health.status, + "critical": health.critical, + "detail": health.detail, + "meta": health.meta, + } diff --git a/src/app_runtime/logging/__init__.py b/src/app_runtime/logging/__init__.py new file mode 100644 index 0000000..514319b --- /dev/null +++ b/src/app_runtime/logging/__init__.py @@ -0,0 +1,3 @@ +from app_runtime.logging.manager import LogManager + +__all__ = ["LogManager"] diff --git a/src/app_runtime/logging/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/logging/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..5682507 Binary files /dev/null and b/src/app_runtime/logging/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/logging/__pycache__/manager.cpython-312.pyc b/src/app_runtime/logging/__pycache__/manager.cpython-312.pyc new file mode 100644 index 0000000..2938c5b Binary files /dev/null and b/src/app_runtime/logging/__pycache__/manager.cpython-312.pyc differ diff --git a/src/app_runtime/logging/manager.py b/src/app_runtime/logging/manager.py new file mode 100644 index 0000000..eb48eda --- /dev/null +++ b/src/app_runtime/logging/manager.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import logging +import logging.config + + +class LogManager: + def __init__(self) -> None: + self._logger = logging.getLogger(__name__) + self._last_valid_config: dict | None = None + + def apply_config(self, config: dict) -> None: + logging_config = config.get("log") + if not logging_config: + self._logger.warning("Config has no 'log' section; default logging remains active.") + return + try: + logging.config.dictConfig(logging_config) + self._last_valid_config = dict(logging_config) + self._logger.info("Logging configuration applied") + except Exception: + self._logger.exception("Failed to apply logging configuration") + self._restore_last_valid() + + def _restore_last_valid(self) -> None: + if self._last_valid_config is None: + return + try: + logging.config.dictConfig(self._last_valid_config) + except Exception: + self._logger.exception("Failed to restore previous logging configuration") diff --git a/src/app_runtime/queue/__init__.py b/src/app_runtime/queue/__init__.py new file mode 100644 index 0000000..95084b1 --- /dev/null +++ b/src/app_runtime/queue/__init__.py @@ -0,0 +1,3 @@ +from app_runtime.queue.in_memory import InMemoryTaskQueue + +__all__ = ["InMemoryTaskQueue"] diff --git a/src/app_runtime/queue/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/queue/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..e5630ff Binary files /dev/null and b/src/app_runtime/queue/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc b/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc new file mode 100644 index 0000000..98aa863 Binary files /dev/null and b/src/app_runtime/queue/__pycache__/in_memory.cpython-312.pyc differ diff --git a/src/app_runtime/queue/in_memory.py b/src/app_runtime/queue/in_memory.py new file mode 100644 index 0000000..16d1db6 --- /dev/null +++ b/src/app_runtime/queue/in_memory.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from queue import Empty, Queue + +from app_runtime.contracts.queue import TaskQueue +from app_runtime.contracts.tasks import Task + + +class InMemoryTaskQueue(TaskQueue): + def __init__(self) -> None: + self._queue: Queue[Task] = Queue() + self._published = 0 + self._acked = 0 + self._nacked = 0 + + def publish(self, task: Task) -> None: + self._published += 1 + self._queue.put(task) + + def consume(self, timeout: float = 0.1) -> Task | None: + try: + return self._queue.get(timeout=timeout) + except Empty: + return None + + def ack(self, task: Task) -> None: + del task + self._acked += 1 + self._queue.task_done() + + def nack(self, task: Task, retry_delay: float | None = None) -> None: + del retry_delay + self._nacked += 1 + self._queue.put(task) + self._queue.task_done() + + def stats(self) -> dict[str, int]: + return { + "published": self._published, + "acked": self._acked, + "nacked": self._nacked, + "queued": self._queue.qsize(), + } diff --git a/src/app_runtime/tracing/__init__.py b/src/app_runtime/tracing/__init__.py new file mode 100644 index 0000000..4048ccc --- /dev/null +++ b/src/app_runtime/tracing/__init__.py @@ -0,0 +1,16 @@ +from app_runtime.contracts.trace import TraceContext, TraceContextRecord, TraceLogMessage, TraceTransport +from app_runtime.tracing.service import TraceManager, TraceService +from app_runtime.tracing.store import ActiveTraceContext, TraceContextStore +from app_runtime.tracing.transport import NoOpTraceTransport + +__all__ = [ + "ActiveTraceContext", + "NoOpTraceTransport", + "TraceContext", + "TraceContextRecord", + "TraceContextStore", + "TraceLogMessage", + "TraceManager", + "TraceService", + "TraceTransport", +] diff --git a/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..c81725e Binary files /dev/null and b/src/app_runtime/tracing/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/manager.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/manager.cpython-312.pyc new file mode 100644 index 0000000..48e0d61 Binary files /dev/null and b/src/app_runtime/tracing/__pycache__/manager.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc new file mode 100644 index 0000000..288c8ed Binary files /dev/null and b/src/app_runtime/tracing/__pycache__/service.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/store.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/store.cpython-312.pyc new file mode 100644 index 0000000..841df78 Binary files /dev/null and b/src/app_runtime/tracing/__pycache__/store.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc b/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc new file mode 100644 index 0000000..161cf01 Binary files /dev/null and b/src/app_runtime/tracing/__pycache__/transport.cpython-312.pyc differ diff --git a/src/app_runtime/tracing/service.py b/src/app_runtime/tracing/service.py new file mode 100644 index 0000000..00a8470 --- /dev/null +++ b/src/app_runtime/tracing/service.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +import logging +from contextlib import contextmanager +from typing import Any, Iterator +from uuid import uuid4 + +from app_runtime.contracts.trace import ( + TraceContext, + TraceContextFactory, + TraceContextRecord, + TraceLogMessage, + TraceTransport, + utc_now, +) +from app_runtime.tracing.store import TraceContextStore +from app_runtime.tracing.transport import NoOpTraceTransport + + +class TraceRecordWriter: + def __init__(self, transport: TraceTransport) -> None: + self._logger = logging.getLogger(__name__) + self._transport = transport + + def write_context(self, record: TraceContextRecord) -> None: + try: + self._transport.write_context(record) + except Exception: + self._logger.exception("Trace transport failed to write context %s", record.trace_id) + + def write_message(self, record: TraceLogMessage) -> None: + try: + self._transport.write_message(record) + except Exception: + self._logger.exception("Trace transport failed to write message for %s", record.trace_id) + + +class TraceService(TraceContextFactory): + def __init__(self, transport: TraceTransport | None = None, store: TraceContextStore | None = None) -> None: + self.transport = transport or NoOpTraceTransport() + self.store = store or TraceContextStore() + self._writer = TraceRecordWriter(self.transport) + + def create_context( + self, + *, + alias: str, + parent_id: str | None = None, + kind: str | None = None, + attrs: dict[str, Any] | None = None, + ) -> str: + record = TraceContextRecord( + trace_id=uuid4().hex, + alias=str(alias or ""), + parent_id=parent_id, + type=kind, + event_time=utc_now(), + attrs=dict(attrs or {}), + ) + self.store.push(record) + self._writer.write_context(record) + return record.trace_id + + @contextmanager + def open_context( + self, + *, + alias: str, + parent_id: str | None = None, + kind: str | None = None, + attrs: dict[str, Any] | None = None, + ) -> Iterator[str]: + trace_id = self.create_context(alias=alias, parent_id=parent_id, kind=kind, attrs=attrs) + try: + yield trace_id + finally: + self.store.pop() + + def current_trace_id(self) -> str | None: + return self.store.current_trace_id() + + def close_context(self) -> str | None: + previous = self.store.pop() + return previous.record.trace_id if previous else None + + def step(self, name: str) -> None: + self.store.set_step(str(name or "")) + + def info(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + self._write_message("INFO", message, status, attrs) + + def warning(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + self._write_message("WARNING", message, status, attrs) + + def error(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + self._write_message("ERROR", message, status, attrs) + + def exception(self, message: str, *, status: str = "failed", attrs: dict[str, Any] | None = None) -> None: + self._write_message("ERROR", message, status, attrs) + + def new_root(self, operation: str) -> TraceContext: + trace_id = self.create_context(alias=operation, kind="source", attrs={"operation": operation}) + return TraceContext(trace_id=trace_id, span_id=trace_id, attributes={"operation": operation}) + + def child_of(self, parent: TraceContext, operation: str) -> TraceContext: + trace_id = self.create_context( + alias=operation, + parent_id=parent.trace_id, + kind="worker", + attrs={"operation": operation}, + ) + return TraceContext( + trace_id=trace_id, + span_id=trace_id, + parent_span_id=parent.span_id, + attributes={"operation": operation}, + ) + + def attach(self, task_metadata: dict[str, object], context: TraceContext) -> dict[str, object]: + updated = dict(task_metadata) + updated["trace_id"] = context.trace_id + updated["span_id"] = context.span_id + updated["parent_span_id"] = context.parent_span_id + return updated + + def resume(self, task_metadata: dict[str, object], operation: str) -> TraceContext: + trace_id = str(task_metadata.get("trace_id") or uuid4().hex) + span_id = str(task_metadata.get("span_id") or trace_id) + parent_id = task_metadata.get("parent_span_id") + self.create_context( + alias=operation, + parent_id=str(parent_id) if parent_id else None, + kind="handler", + attrs=dict(task_metadata), + ) + return TraceContext( + trace_id=trace_id, + span_id=span_id, + parent_span_id=str(parent_id) if parent_id else None, + attributes={"operation": operation}, + ) + + def _write_message( + self, + level: str, + message: str, + status: str, + attrs: dict[str, Any] | None, + ) -> None: + active = self.store.current() + if active is None: + raise RuntimeError("Trace context is not bound. Call create_context() first.") + record = TraceLogMessage( + trace_id=active.record.trace_id, + step=active.step, + status=str(status or ""), + message=str(message or ""), + level=level, + event_time=utc_now(), + attrs=dict(attrs or {}), + ) + self._writer.write_message(record) + + +class TraceManager(TraceService): + """Compatibility alias during the transition from ConfigManager-shaped naming.""" diff --git a/src/app_runtime/tracing/store.py b/src/app_runtime/tracing/store.py new file mode 100644 index 0000000..ad62f0f --- /dev/null +++ b/src/app_runtime/tracing/store.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass, replace + +from app_runtime.contracts.trace import TraceContextRecord + + +@dataclass(frozen=True) +class ActiveTraceContext: + record: TraceContextRecord + step: str = "" + + +class TraceContextStore: + def __init__(self) -> None: + self._current: ContextVar[ActiveTraceContext | None] = ContextVar("trace_current", default=None) + self._stack: ContextVar[tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=()) + + def current(self) -> ActiveTraceContext | None: + return self._current.get() + + def current_trace_id(self) -> str | None: + active = self.current() + return active.record.trace_id if active else None + + def push(self, record: TraceContextRecord) -> ActiveTraceContext: + active = self.current() + stack = self._stack.get() + if active is not None: + self._stack.set(stack + (active,)) + updated = ActiveTraceContext(record=record) + self._current.set(updated) + return updated + + def pop(self) -> ActiveTraceContext | None: + stack = self._stack.get() + if not stack: + self._current.set(None) + return None + previous = stack[-1] + self._stack.set(stack[:-1]) + self._current.set(previous) + return previous + + def set_step(self, step: str) -> ActiveTraceContext | None: + active = self.current() + if active is None: + return None + updated = replace(active, step=step) + self._current.set(updated) + return updated diff --git a/src/app_runtime/tracing/transport.py b/src/app_runtime/tracing/transport.py new file mode 100644 index 0000000..07416e7 --- /dev/null +++ b/src/app_runtime/tracing/transport.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from app_runtime.contracts.trace import TraceContextRecord, TraceLogMessage, TraceTransport + + +class NoOpTraceTransport(TraceTransport): + def write_context(self, record: TraceContextRecord) -> None: + del record + + def write_message(self, record: TraceLogMessage) -> None: + del record diff --git a/src/app_runtime/workers/__init__.py b/src/app_runtime/workers/__init__.py new file mode 100644 index 0000000..69099d4 --- /dev/null +++ b/src/app_runtime/workers/__init__.py @@ -0,0 +1,4 @@ +from app_runtime.workers.queue_worker import QueueWorker +from app_runtime.workers.supervisor import WorkerSupervisor + +__all__ = ["QueueWorker", "WorkerSupervisor"] diff --git a/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc b/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..3b1f2ec Binary files /dev/null and b/src/app_runtime/workers/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/app_runtime/workers/__pycache__/queue_worker.cpython-312.pyc b/src/app_runtime/workers/__pycache__/queue_worker.cpython-312.pyc new file mode 100644 index 0000000..38fd423 Binary files /dev/null and b/src/app_runtime/workers/__pycache__/queue_worker.cpython-312.pyc differ diff --git a/src/app_runtime/workers/__pycache__/runner.cpython-312.pyc b/src/app_runtime/workers/__pycache__/runner.cpython-312.pyc new file mode 100644 index 0000000..d7087b4 Binary files /dev/null and b/src/app_runtime/workers/__pycache__/runner.cpython-312.pyc differ diff --git a/src/app_runtime/workers/__pycache__/supervisor.cpython-312.pyc b/src/app_runtime/workers/__pycache__/supervisor.cpython-312.pyc new file mode 100644 index 0000000..3b0efe3 Binary files /dev/null and b/src/app_runtime/workers/__pycache__/supervisor.cpython-312.pyc differ diff --git a/src/app_runtime/workers/queue_worker.py b/src/app_runtime/workers/queue_worker.py new file mode 100644 index 0000000..8132e3c --- /dev/null +++ b/src/app_runtime/workers/queue_worker.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from threading import Event, Lock, Thread + +from app_runtime.contracts.queue import TaskQueue +from app_runtime.contracts.tasks import TaskHandler +from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus +from app_runtime.tracing.service import TraceService + + +class QueueWorker(Worker): + def __init__( + self, + name: str, + queue: TaskQueue, + handler: TaskHandler, + traces: TraceService, + *, + concurrency: int = 1, + critical: bool = True, + ) -> None: + self._name = name + self._queue = queue + self._handler = handler + self._traces = traces + self._concurrency = concurrency + self._critical = critical + self._threads: list[Thread] = [] + self._stop_requested = Event() + self._force_stop = Event() + self._lock = Lock() + self._started = False + self._in_flight = 0 + self._processed = 0 + self._failures = 0 + + @property + def name(self) -> str: + return self._name + + @property + def critical(self) -> bool: + return self._critical + + def start(self) -> None: + if any(thread.is_alive() for thread in self._threads): + return + self._threads.clear() + self._stop_requested.clear() + self._force_stop.clear() + self._started = True + for index in range(self._concurrency): + thread = Thread(target=self._run_loop, name=f"{self._name}-{index + 1}", daemon=True) + self._threads.append(thread) + thread.start() + + def stop(self, force: bool = False) -> None: + self._stop_requested.set() + if force: + self._force_stop.set() + + def health(self) -> WorkerHealth: + status = self.status() + if self._started and not self._stop_requested.is_set() and self._alive_threads() == 0: + return WorkerHealth(self.name, "unhealthy", self.critical, "worker threads are not running", status.meta) + if self._failures > 0: + return WorkerHealth(self.name, "degraded", self.critical, "worker has processing failures", status.meta) + return WorkerHealth(self.name, "ok", self.critical, meta=status.meta) + + def status(self) -> WorkerStatus: + alive_threads = self._alive_threads() + with self._lock: + in_flight = self._in_flight + processed = self._processed + failures = self._failures + if self._started and alive_threads == 0: + state = "stopped" + elif self._stop_requested.is_set(): + state = "stopping" if alive_threads > 0 else "stopped" + elif not self._started: + state = "stopped" + elif in_flight > 0: + state = "busy" + else: + state = "idle" + return WorkerStatus( + name=self.name, + state=state, + in_flight=in_flight, + meta={ + "alive_threads": alive_threads, + "concurrency": self._concurrency, + "processed": processed, + "failures": failures, + }, + ) + + def _run_loop(self) -> None: + while True: + if self._force_stop.is_set() or self._stop_requested.is_set(): + return + task = self._queue.consume(timeout=0.1) + if task is None: + continue + with self._lock: + self._in_flight += 1 + self._traces.resume(task.metadata, f"worker:{self.name}") + try: + self._handler.handle(task) + except Exception: + with self._lock: + self._failures += 1 + self._queue.nack(task) + else: + with self._lock: + self._processed += 1 + self._queue.ack(task) + finally: + with self._lock: + self._in_flight -= 1 + if self._stop_requested.is_set(): + return + + def _alive_threads(self) -> int: + return sum(1 for thread in self._threads if thread.is_alive()) diff --git a/src/app_runtime/workers/supervisor.py b/src/app_runtime/workers/supervisor.py new file mode 100644 index 0000000..a73277b --- /dev/null +++ b/src/app_runtime/workers/supervisor.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from dataclasses import asdict +from time import monotonic, sleep + +from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus +from app_runtime.core.types import LifecycleState + + +class WorkerSupervisor: + def __init__(self) -> None: + self._workers: list[Worker] = [] + + def register(self, worker: Worker) -> None: + self._workers.append(worker) + + def start(self) -> None: + for worker in self._workers: + worker.start() + + def stop(self, timeout: float = 30.0, force: bool = False) -> None: + for worker in self._workers: + worker.stop(force=force) + if force: + return + deadline = monotonic() + timeout + while True: + if all(status.state == "stopped" for status in self.statuses()): + return + if monotonic() >= deadline: + raise TimeoutError("Workers did not stop within the requested timeout") + sleep(0.05) + + def snapshot(self) -> dict[str, object]: + return { + "registered": len(self._workers), + "workers": [asdict(status) for status in self.statuses()], + } + + def healths(self) -> list[WorkerHealth]: + return [worker.health() for worker in self._workers] + + def statuses(self) -> list[WorkerStatus]: + return [worker.status() for worker in self._workers] + + def lifecycle_state(self) -> LifecycleState: + statuses = self.statuses() + if not statuses: + return LifecycleState.IDLE + states = {status.state for status in statuses} + if "stopping" in states: + return LifecycleState.STOPPING + if "starting" in states: + return LifecycleState.STARTING + if "busy" in states: + return LifecycleState.BUSY + if states == {"stopped"}: + return LifecycleState.STOPPED + return LifecycleState.IDLE diff --git a/src/plba/__init__.py b/src/plba/__init__.py new file mode 100644 index 0000000..46f44cb --- /dev/null +++ b/src/plba/__init__.py @@ -0,0 +1,57 @@ +from plba.bootstrap import create_runtime +from plba.config import ConfigFileLoader, FileConfigProvider +from plba.control import ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel +from plba.contracts import ( + ApplicationModule, + ConfigProvider, + HealthContributor, + Task, + TaskHandler, + TaskQueue, + TraceContext, + TraceContextRecord, + TraceLogMessage, + TraceTransport, + Worker, + WorkerHealth, + WorkerStatus, +) +from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer +from plba.health import HealthRegistry +from plba.logging import LogManager +from plba.queue import InMemoryTaskQueue +from plba.tracing import NoOpTraceTransport, TraceService +from plba.workers import QueueWorker, WorkerSupervisor + +__all__ = [ + "ApplicationModule", + "ConfigFileLoader", + "ConfigProvider", + "ConfigurationManager", + "ControlActionSet", + "ControlChannel", + "ControlPlaneService", + "create_runtime", + "FileConfigProvider", + "HealthContributor", + "HealthRegistry", + "HttpControlChannel", + "InMemoryTaskQueue", + "LogManager", + "NoOpTraceTransport", + "QueueWorker", + "RuntimeManager", + "ServiceContainer", + "Task", + "TaskHandler", + "TaskQueue", + "TraceContext", + "TraceContextRecord", + "TraceLogMessage", + "TraceService", + "TraceTransport", + "Worker", + "WorkerHealth", + "WorkerStatus", + "WorkerSupervisor", +] diff --git a/src/plba/__pycache__/__init__.cpython-312.pyc b/src/plba/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..5ff3d38 Binary files /dev/null and b/src/plba/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/plba/__pycache__/bootstrap.cpython-312.pyc b/src/plba/__pycache__/bootstrap.cpython-312.pyc new file mode 100644 index 0000000..154658f Binary files /dev/null and b/src/plba/__pycache__/bootstrap.cpython-312.pyc differ diff --git a/src/plba/__pycache__/config.cpython-312.pyc b/src/plba/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..1ecc1a8 Binary files /dev/null and b/src/plba/__pycache__/config.cpython-312.pyc differ diff --git a/src/plba/__pycache__/contracts.cpython-312.pyc b/src/plba/__pycache__/contracts.cpython-312.pyc new file mode 100644 index 0000000..cd692e0 Binary files /dev/null and b/src/plba/__pycache__/contracts.cpython-312.pyc differ diff --git a/src/plba/__pycache__/control.cpython-312.pyc b/src/plba/__pycache__/control.cpython-312.pyc new file mode 100644 index 0000000..05cd823 Binary files /dev/null and b/src/plba/__pycache__/control.cpython-312.pyc differ diff --git a/src/plba/__pycache__/core.cpython-312.pyc b/src/plba/__pycache__/core.cpython-312.pyc new file mode 100644 index 0000000..ce14476 Binary files /dev/null and b/src/plba/__pycache__/core.cpython-312.pyc differ diff --git a/src/plba/__pycache__/health.cpython-312.pyc b/src/plba/__pycache__/health.cpython-312.pyc new file mode 100644 index 0000000..0ad9e9d Binary files /dev/null and b/src/plba/__pycache__/health.cpython-312.pyc differ diff --git a/src/plba/__pycache__/logging.cpython-312.pyc b/src/plba/__pycache__/logging.cpython-312.pyc new file mode 100644 index 0000000..09ce28c Binary files /dev/null and b/src/plba/__pycache__/logging.cpython-312.pyc differ diff --git a/src/plba/__pycache__/queue.cpython-312.pyc b/src/plba/__pycache__/queue.cpython-312.pyc new file mode 100644 index 0000000..942582e Binary files /dev/null and b/src/plba/__pycache__/queue.cpython-312.pyc differ diff --git a/src/plba/__pycache__/tracing.cpython-312.pyc b/src/plba/__pycache__/tracing.cpython-312.pyc new file mode 100644 index 0000000..84a4dfd Binary files /dev/null and b/src/plba/__pycache__/tracing.cpython-312.pyc differ diff --git a/src/plba/__pycache__/workers.cpython-312.pyc b/src/plba/__pycache__/workers.cpython-312.pyc new file mode 100644 index 0000000..6183a0f Binary files /dev/null and b/src/plba/__pycache__/workers.cpython-312.pyc differ diff --git a/src/plba/bootstrap.py b/src/plba/bootstrap.py new file mode 100644 index 0000000..80318da --- /dev/null +++ b/src/plba/bootstrap.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from app_runtime.control.http_channel import HttpControlChannel +from app_runtime.core.runtime import RuntimeManager +from app_runtime.contracts.application import ApplicationModule + + +def create_runtime( + module: ApplicationModule, + *, + config_path: str | None = None, + enable_http_control: bool = False, + control_host: str = "127.0.0.1", + control_port: int = 8080, + control_timeout: int = 5, +) -> RuntimeManager: + runtime = RuntimeManager() + if config_path is not None: + runtime.add_config_file(config_path) + if enable_http_control: + runtime.control_plane.register_channel( + HttpControlChannel( + host=control_host, + port=control_port, + timeout=control_timeout, + ) + ) + runtime.register_module(module) + return runtime diff --git a/src/plba/config.py b/src/plba/config.py new file mode 100644 index 0000000..eab5d39 --- /dev/null +++ b/src/plba/config.py @@ -0,0 +1,4 @@ +from app_runtime.config.file_loader import ConfigFileLoader +from app_runtime.config.providers import FileConfigProvider + +__all__ = ["ConfigFileLoader", "FileConfigProvider"] diff --git a/src/plba/contracts.py b/src/plba/contracts.py new file mode 100644 index 0000000..e525433 --- /dev/null +++ b/src/plba/contracts.py @@ -0,0 +1,28 @@ +from app_runtime.contracts.application import ApplicationModule +from app_runtime.contracts.config import ConfigProvider +from app_runtime.contracts.health import HealthContributor +from app_runtime.contracts.queue import TaskQueue +from app_runtime.contracts.tasks import Task, TaskHandler +from app_runtime.contracts.trace import ( + TraceContext, + TraceContextRecord, + TraceLogMessage, + TraceTransport, +) +from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus + +__all__ = [ + "ApplicationModule", + "ConfigProvider", + "HealthContributor", + "Task", + "TaskHandler", + "TaskQueue", + "TraceContext", + "TraceContextRecord", + "TraceLogMessage", + "TraceTransport", + "Worker", + "WorkerHealth", + "WorkerStatus", +] diff --git a/src/plba/control.py b/src/plba/control.py new file mode 100644 index 0000000..8b817ee --- /dev/null +++ b/src/plba/control.py @@ -0,0 +1,10 @@ +from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.http_channel import HttpControlChannel +from app_runtime.control.service import ControlPlaneService + +__all__ = [ + "ControlActionSet", + "ControlChannel", + "ControlPlaneService", + "HttpControlChannel", +] diff --git a/src/plba/core.py b/src/plba/core.py new file mode 100644 index 0000000..b544acb --- /dev/null +++ b/src/plba/core.py @@ -0,0 +1,9 @@ +from app_runtime.core.configuration import ConfigurationManager +from app_runtime.core.runtime import RuntimeManager +from app_runtime.core.service_container import ServiceContainer + +__all__ = [ + "ConfigurationManager", + "RuntimeManager", + "ServiceContainer", +] diff --git a/src/plba/health.py b/src/plba/health.py new file mode 100644 index 0000000..2f252fa --- /dev/null +++ b/src/plba/health.py @@ -0,0 +1,3 @@ +from app_runtime.health.registry import HealthRegistry + +__all__ = ["HealthRegistry"] diff --git a/src/plba/logging.py b/src/plba/logging.py new file mode 100644 index 0000000..514319b --- /dev/null +++ b/src/plba/logging.py @@ -0,0 +1,3 @@ +from app_runtime.logging.manager import LogManager + +__all__ = ["LogManager"] diff --git a/src/plba/queue.py b/src/plba/queue.py new file mode 100644 index 0000000..95084b1 --- /dev/null +++ b/src/plba/queue.py @@ -0,0 +1,3 @@ +from app_runtime.queue.in_memory import InMemoryTaskQueue + +__all__ = ["InMemoryTaskQueue"] diff --git a/src/plba/tracing.py b/src/plba/tracing.py new file mode 100644 index 0000000..73ee774 --- /dev/null +++ b/src/plba/tracing.py @@ -0,0 +1,4 @@ +from app_runtime.tracing.service import TraceService +from app_runtime.tracing.transport import NoOpTraceTransport + +__all__ = ["NoOpTraceTransport", "TraceService"] diff --git a/src/plba/workers.py b/src/plba/workers.py new file mode 100644 index 0000000..69099d4 --- /dev/null +++ b/src/plba/workers.py @@ -0,0 +1,4 @@ +from app_runtime.workers.queue_worker import QueueWorker +from app_runtime.workers.supervisor import WorkerSupervisor + +__all__ = ["QueueWorker", "WorkerSupervisor"] diff --git a/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc b/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc new file mode 100644 index 0000000..0031da0 Binary files /dev/null and b/tests/__pycache__/test_runtime.cpython-312-pytest-9.0.2.pyc differ diff --git a/tests/test_runtime.py b/tests/test_runtime.py new file mode 100644 index 0000000..b85bc1f --- /dev/null +++ b/tests/test_runtime.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from threading import Event, Thread +from time import sleep + +from app_runtime.contracts.application import ApplicationModule +from app_runtime.contracts.health import HealthContributor +from app_runtime.contracts.tasks import Task, TaskHandler +from app_runtime.contracts.worker import WorkerHealth +from app_runtime.core.registration import ModuleRegistry +from app_runtime.core.runtime import RuntimeManager +from app_runtime.queue.in_memory import InMemoryTaskQueue +from app_runtime.tracing.transport import NoOpTraceTransport +from app_runtime.workers.queue_worker import QueueWorker + + +@dataclass +class CollectingHandler(TaskHandler): + processed: list[dict[str, object]] = field(default_factory=list) + + def handle(self, task: Task) -> None: + self.processed.append(task.payload) + + +class BlockingHandler(TaskHandler): + def __init__(self, started: Event, release: Event) -> None: + self._started = started + self._release = release + + def handle(self, task: Task) -> None: + del task + self._started.set() + self._release.wait(timeout=2.0) + + +class StaticHealthContributor(HealthContributor): + def health(self) -> WorkerHealth: + return WorkerHealth(name="example-module", status="ok", critical=False, meta={"kind": "test"}) + + +class ExampleModule(ApplicationModule): + def __init__(self) -> None: + self.handler = CollectingHandler() + self.queue = InMemoryTaskQueue() + + @property + def name(self) -> str: + return "example" + + def register(self, registry: ModuleRegistry) -> None: + traces = registry.services.get("traces") + registry.add_queue("incoming", self.queue) + registry.add_handler("collect", self.handler) + self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) + registry.add_worker(QueueWorker("collector", self.queue, self.handler, traces, concurrency=1)) + registry.add_health_contributor(StaticHealthContributor()) + + +class BlockingModule(ApplicationModule): + def __init__(self, started: Event, release: Event) -> None: + self.queue = InMemoryTaskQueue() + self.handler = BlockingHandler(started, release) + + @property + def name(self) -> str: + return "blocking" + + def register(self, registry: ModuleRegistry) -> None: + traces = registry.services.get("traces") + self.queue.publish(Task(name="incoming", payload={"id": 1}, metadata={})) + registry.add_worker(QueueWorker("blocking-worker", self.queue, self.handler, traces, concurrency=1)) + + +class RecordingTransport(NoOpTraceTransport): + def __init__(self) -> None: + self.contexts: list[object] = [] + self.messages: list[object] = [] + + def write_context(self, record) -> None: # type: ignore[override] + self.contexts.append(record) + + def write_message(self, record) -> None: # type: ignore[override] + self.messages.append(record) + + +def test_runtime_processes_tasks_and_exposes_status(tmp_path) -> None: + config_path = tmp_path / "config.yml" + config_path.write_text( + """ +platform: + workers: 1 +log: + version: 1 + disable_existing_loggers: false + handlers: + console: + class: logging.StreamHandler + root: + level: INFO + handlers: [console] +""".strip(), + encoding="utf-8", + ) + runtime = RuntimeManager() + runtime.add_config_file(config_path) + module = ExampleModule() + runtime.register_module(module) + + runtime.start() + sleep(0.2) + status = runtime.status() + runtime.stop() + + assert module.handler.processed == [{"id": 1}] + assert status["modules"] == ["example"] + assert status["runtime"]["state"] == "idle" + assert status["health"]["status"] == "ok" + assert status["config"] == {"platform": {"workers": 1}, "log": status["config"]["log"]} + + +def test_runtime_graceful_stop_waits_until_worker_finishes() -> None: + started = Event() + release = Event() + runtime = RuntimeManager() + runtime.register_module(BlockingModule(started, release)) + runtime.start() + assert started.wait(timeout=1.0) is True + assert runtime.status()["runtime"]["state"] == "busy" + + stop_thread = Thread(target=runtime.stop, kwargs={"timeout": 1.0}, daemon=True) + stop_thread.start() + sleep(0.1) + assert stop_thread.is_alive() is True + + release.set() + stop_thread.join(timeout=1.0) + assert stop_thread.is_alive() is False + assert runtime.status()["runtime"]["state"] == "stopped" + + +def test_trace_service_writes_contexts_and_messages() -> None: + from app_runtime.tracing.service import TraceService + + transport = RecordingTransport() + manager = TraceService(transport=transport) + + with manager.open_context(alias="worker", kind="task", attrs={"task": "incoming"}): + manager.step("parse") + manager.info("started", status="ok", attrs={"attempt": 1}) + + assert len(transport.contexts) == 1 + assert len(transport.messages) == 1 + assert transport.contexts[0].alias == "worker" + assert transport.messages[0].step == "parse" + + +def test_http_control_channel_exposes_health_and_actions() -> None: + from app_runtime.control.base import ControlActionSet + from app_runtime.control.http_channel import HttpControlChannel + + state = {"started": False} + + async def health(): + return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} + + async def start_handler() -> str: + state["started"] = True + return "started" + + async def stop_handler() -> str: + state["started"] = False + return "stopped" + + async def status_handler() -> str: + return "idle" if state["started"] else "stopped" + + async def scenario() -> None: + channel = HttpControlChannel("127.0.0.1", 0, 2) + await channel.start( + ControlActionSet( + health=health, + start=start_handler, + stop=stop_handler, + status=status_handler, + ) + ) + + start_response = await channel._action_response("start") + assert start_response.status_code == 200 + assert start_response.body == b'{"status":"ok","detail":"started"}' + + health_payload = await channel._health_response() + assert health_payload["status"] == "ok" + + status_response = await channel._action_response("status") + assert status_response.status_code == 200 + assert status_response.body == b'{"status":"ok","detail":"idle"}' + await channel.stop() + + asyncio.run(scenario()) + + +def test_public_plba_package_exports_runtime_builder(tmp_path) -> None: + from plba import ApplicationModule as PublicApplicationModule + from plba import QueueWorker as PublicQueueWorker + from plba import create_runtime + + config_path = tmp_path / "config.yml" + config_path.write_text("platform: {}\n", encoding="utf-8") + + class PublicExampleModule(PublicApplicationModule): + @property + def name(self) -> str: + return "public-example" + + def register(self, registry: ModuleRegistry) -> None: + queue = InMemoryTaskQueue() + traces = registry.services.get("traces") + handler = CollectingHandler() + queue.publish(Task(name="incoming", payload={"id": 2}, metadata={})) + registry.add_worker(PublicQueueWorker("public-worker", queue, handler, traces)) + + runtime = create_runtime(PublicExampleModule(), config_path=str(config_path)) + runtime.start() + sleep(0.2) + assert runtime.configuration.get() == {"platform": {}} + assert runtime.status()["workers"]["registered"] == 1 + runtime.stop() diff --git a/vision.md b/vision.md new file mode 100644 index 0000000..41b88b1 --- /dev/null +++ b/vision.md @@ -0,0 +1,119 @@ +# Vision + +## Purpose + +This project provides a reusable runtime platform for business applications. + +The runtime exists to solve service and infrastructure concerns that are needed by many applications but do not belong to business logic: +- start and stop +- status and lifecycle +- configuration +- worker execution +- trace propagation +- health checks +- control and administration +- later authentication and user management + +The runtime should allow a business application to focus only on domain behavior. + +## Vision statement + +We build a platform where business applications are assembled from small domain modules, while the runtime consistently provides lifecycle, workers, tracing, configuration, and control-plane capabilities. + +## Problem + +In the previous generation, the execution model was centered around a single `execute()` entry point called by a timer loop. + +That model works for simple periodic jobs, but it becomes too narrow when we need: +- queue-driven processing +- multiple concurrent workers +- independent task sources +- richer health semantics +- trace propagation across producer and consumer boundaries +- reusable runtime patterns across different applications +- future admin and authentication capabilities + +The old model couples orchestration and execution too tightly. + +## Desired future state + +The new runtime should provide: +- a reusable lifecycle and orchestration core +- a clean contract for business applications +- support for sources, queues, workers, and handlers +- explicit separation between platform and domain +- trace and health as first-class platform services +- the ability to evolve into an admin platform + +## Design principles + +### 1. Platform vs domain separation + +The runtime owns platform concerns. +Applications own domain concerns. + +The runtime must not contain business rules such as: +- email parsing policies +- order creation logic +- invoice decisions +- client-specific rules + +### 2. Composition over inheritance + +Applications should be composed by registering modules and services in the runtime, not by inheriting a timer-driven class and overriding one method. + +### 3. Explicit task model + +Applications should model processing as: +- task source +- task queue +- worker +- handler + +This is more scalable than one monolithic execute loop. + +### 4. Parallelism as a first-class concern + +The runtime should supervise worker pools and concurrency safely instead of leaving this to ad hoc application code. + +### 5. Observability by default + +Trace, logs, metrics, and health should be available as platform services from the start. + +### 6. Evolvability + +The runtime should be able to support: +- different queue backends +- different task sources +- different control planes +- different admin capabilities +without forcing business applications to change their domain code. + +## First target use case + +The first business application is `mail_order_bot`. + +Its business domain is: +- fetching incoming mail +- processing attachments +- executing order-related pipelines + +Its platform/runtime needs are: +- lifecycle management +- polling or IMAP IDLE source +- queueing +- worker pools +- tracing +- health checks +- future admin API + +This makes it a good pilot for the new runtime. + +## Success criteria + +We consider the runtime direction successful if: +- `mail_order_bot` business logic can run on top of it without leaking infrastructure details into domain code +- the runtime can manage concurrent workers +- the runtime can support a queue-based flow +- the runtime can expose status and health +- the runtime can later host admin/auth features without redesigning the core