Compare commits

..

11 Commits

116 changed files with 3106 additions and 8145 deletions
+3
View File
@@ -2,6 +2,9 @@
.venv
__pycache__
# Runtime agent traces (local only; written by RequestTraceLogger)
runtime_traces/
# Pipeline harness: per-run artifacts (md/json from tests.pipeline_setup_v3/v4)
tests/**/test_runs/**/*.md
tests/**/test_runs/**/*.json
+28 -1
View File
@@ -91,6 +91,12 @@ Mermaid-диаграмма должна содержать:
- `application`
- `platform`
Дополнительные метаданные для случаев изменения существующей документации:
- `action`
- `target_doc_id`
- `target_path`
#### 6.x для `ui_page`
Обязательная структура:
@@ -117,6 +123,24 @@ Mermaid-диаграмма должна содержать:
- Поля описывать списком (не таблицей).
- Общие правила (например, read-only, поведение при пустом значении) выносить в общий блок, не дублировать для каждого поля.
Отдельно нужно различать два сценария описания:
1. Если описывается новая UI-страница или новая самостоятельная UI-форма, раздел оформляется полноценно по шаблону `ui_page`.
- Нужно дать достаточный контекст для разработки и тестирования.
- Нужно подробно описывать структуру формы, состояния отображения, поведение полей, ошибки, empty state и пользовательские действия.
2. Если описывается доработка уже существующей страницы или существующей UI-формы, не нужно повторно копировать полное описание из действующей документации.
- Нужно учитывать уже существующее описание страницы в документации и аналитике.
- В аналитике нужно явно указать, что именно меняется в существующем сценарии: что добавляется, редактируется или удаляется.
- Нужно указывать точку изменения: в какой существующей странице, форме, блоке или сценарии вносится изменение.
- Нужно ссылаться на существующий документ или раздел, где базовое поведение уже описано.
- Нужно описывать только delta изменений, достаточную для реализации доработки и актуализации документации.
- Полное описание существующей страницы в таком разделе не дублируется.
- Для такой доработки в metadata нужно явно указывать `action: update`.
- Если изменение должно попасть в уже существующий markdown-документ, нужно явно указывать `target_doc_id` и/или `target_path`.
- `target_doc_id` должен совпадать с `id` существующего документа, который требуется обновить.
- Если `target_doc_id`/`target_path` не указаны, агент может ошибочно интерпретировать раздел как создание нового документа.
Нефункциональные требования для `ui_page`:
- пользовательская аналитика оформляется таблицей с колонками:
@@ -172,6 +196,10 @@ Mermaid-диаграмма должна содержать:
- `### Функциональные требования`
- `### Нефункциональные требования`
`logic_block` удобно использовать для фиксации точечных изменений существующего сценария, если раздел не описывает новую самостоятельную страницу или новую самостоятельную форму, а только уточняет delta к уже существующей документации.
Если точечное изменение должно изменить существующий документ другого типа, `logic_block` для этого использовать нельзя. В этом случае metadata раздела должна указывать тип и идентификатор целевого существующего документа, который требуется обновить.
## Дополнительные правила по слоям
- Проверка ролевой модели пользователя обычно выполняется на уровне `ufs`.
@@ -182,4 +210,3 @@ Mermaid-диаграмма должна содержать:
- Аудит: события, которые фиксируют действия пользователя и позволяют ответить на вопрос «кто, что, когда сделал».
- Мониторинг: технические события/метрики для контроля стабильности и поиска сбоев.
@@ -1,5 +1,7 @@
# API Contract Rules
Этот rule описывает только тело секции `### Контракт`.
## Обязательные части
- request parameters (`header/query/path`)
- request body (если применимо)
@@ -9,6 +11,11 @@
- timeout
- retry/idempotency (если применимо)
## Правила заголовков внутри тела секции
- Не повторять заголовок `Контракт`.
- Запрещено выводить `## Контракт` и `### Контракт` внутри тела секции.
- Если нужны подзаголовки, использовать только уровень ниже родительской секции: `#### Запрос`, `#### Ответ`, `#### Ошибки`, `#### Auth`, `#### Timeout`, `#### Retry/Idempotency`.
## Табличный формат
Для request/response таблицы должны содержать:
- название
@@ -1,5 +1,7 @@
# Functional Requirements Rules
Этот rule описывает только тело секции `### Функциональные требования`.
## Формат
- `FR.<номер>. <Название>`
- Нумерация инкрементальная внутри документа.
@@ -9,6 +11,7 @@
- FR не копируют шаги сценария без добавления новой информации.
- Для интеграционных шагов FR обязательны.
- Если в сценарии есть вызов внешнего API / сервиса / БД, нужен отдельный FR на интеграцию.
- Запрещено повторять заголовок `### Функциональные требования` внутри тела секции.
## FR для интеграционных шагов
Для интеграционного FR обязательно раскрывать:
@@ -1,5 +1,7 @@
# Tech Use Case Rules
Этот rule описывает только тело секции `### Технический use case`.
## Обязательные части
- название
- предусловия
@@ -14,3 +16,4 @@
- Формат шага: смысловое действие + техническая реализация (endpoint/топик/операция).
- Длинные технические детали выносить в FR и ссылаться на FR из шага.
- Для интеграционных шагов описание обработки ошибок обязательно.
- Запрещено повторять заголовок `### Технический use case` внутри тела секции.
+12 -1
View File
@@ -7,6 +7,10 @@
- Структура документа определяется только template соответствующего типа.
- Правила написания конкретного раздела определяются только соответствующим `common-elements` файлом.
- Manifest типа документа хранится во frontmatter соответствующего template.
- Генератор секции всегда пишет только тело секции, а не сам заголовок секции.
- Дублирование заголовков запрещено: нельзя повторно выводить заголовок текущей секции внутри ее тела.
- Если template уже содержит `### <Заголовок секции>`, то внутри тела допустимы только подзаголовки более глубокого уровня (`####` и ниже).
- Нельзя повышать уровень заголовка внутри тела секции до `##` или повторять `###` с тем же названием секции.
## 2. Источники требований
При генерации документа учитывать:
@@ -34,7 +38,14 @@
5. Применить body template как единственный источник структуры.
5. Проверить чек-лист совместимости с аналитикой (domain/sub_domain, роли слоев, интеграции, ошибки).
## 6. Формат manifest типа документа
## 6. Специальные инварианты для `api_method`
- Во frontmatter обязательно должно присутствовать поле `endpoint`.
- Внутри `## Details` секция `### Контракт` должна присутствовать ровно один раз.
- Внутри тела секции `### Контракт` запрещено повторять заголовки `## Контракт` и `### Контракт`.
- Внутри `### Технический use case` запрещено повторять заголовок `### Технический use case`.
- Внутри `### Функциональные требования` запрещено повторять заголовок `### Функциональные требования`.
## 7. Формат manifest типа документа
Manifest типа документа хранится во frontmatter `templates/<doc_type>.template.md`.
Минимальная схема:
@@ -20,6 +20,11 @@ related_code: []
system_analytics_refs: []
```
## Дополнительные обязательные поля по типам документов
- Для `doc_type: api_method` поле `endpoint` обязательно.
- Значение `endpoint` должно содержать HTTP-метод и путь, например: `GET /orders/{orderId}`.
- Если в аналитике endpoint указан в заголовке раздела, use case, контракте или интеграционной схеме, его нужно перенести во frontmatter и не опускать.
## Body-метаданные для секции изменений
Под корнем секции изменений указывать:
- `domain`
+1
View File
@@ -11,6 +11,7 @@ requires-python = ">=3.11"
dependencies = [
"fastapi>=0.116",
"uvicorn>=0.35",
"python-dotenv>=1.0",
"pydantic>=2.11",
"langgraph>=0.6",
"langgraph-checkpoint-postgres>=2.0",
+1
View File
@@ -1,5 +1,6 @@
fastapi==0.116.1
uvicorn==0.35.0
python-dotenv==1.0.1
pydantic==2.11.7
langgraph==0.6.7
langgraph-checkpoint-postgres==2.0.23
@@ -1,327 +0,0 @@
# Runtime Trace: 20260410-130611-31bb5d20c67b
- active_rag_session_id: 0ae059fe-076a-4aa4-abd4-31bb5d20c67b
## request
```json
{
"request_id": "req_a14d483fd13b44fa98eb81dd6dd3ccdc",
"session_id": "as_90d274870b1247d19694bbef1afa389a",
"active_rag_session_id": "0ae059fe-076a-4aa4-abd4-31bb5d20c67b",
"process_version": "v2",
"created_at": "2026-04-10T13:06:11.385561+00:00",
"message": "Какие методы апи есть в проекте"
}
```
## process.v2
```json
{
"event": "intent_routed",
"routing_domain": "DOCS",
"intent": "DOC_EXPLAIN",
"subintent": "API_EXPOSED",
"normalized_query": "Какие методы апи есть в проекте",
"target_terms": [],
"anchors": {
"entity_names": [],
"file_names": [],
"endpoint_paths": [],
"target_doc_hints": [],
"matched_aliases": [],
"process_domain": null,
"process_subdomain": null,
"scope_type": "global",
"candidate_domains": [],
"candidate_subdomains": [],
"candidate_entities": [],
"candidate_apis": [],
"signal_types": []
},
"confidence": 0.8500000000000001,
"routing_mode": "llm_default",
"llm_router_used": true,
"reason_short": "Запрос явно касается перечисления доступных API-методов.",
"rag_session_id": "0ae059fe-076a-4aa4-abd4-31bb5d20c67b"
}
```
## process.v2.pipeline
```json
{
"event": "router_resolved",
"domain": "DOCS",
"intent": "DOC_EXPLAIN",
"subintent": "API_EXPOSED",
"confidence": 0.8500000000000001
}
```
## process.v2.pipeline
```json
{
"event": "anchors_extracted",
"signal_types": [],
"endpoint_paths": [],
"target_doc_hints": [],
"matched_aliases": [],
"target_terms": []
}
```
## process.v2.pipeline
```json
{
"event": "alias_resolution",
"resolved_aliases": [],
"target_doc_hints": []
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_started",
"workflow_id": "v2.docs_explain.api_exposed"
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_step_traced",
"workflow_id": "v2.docs_explain.api_exposed",
"step": {
"id": "require_rag_session",
"title": "Проверка RAG-сессии"
},
"input": {},
"output": {
"has_rag_session": true
}
}
```
## process.v2.retrieval_policy
```json
{
"event": "retrieval_plan_resolved",
"profile": "api_exposed",
"layers": [
"D1_DOCUMENT_CATALOG"
],
"limit": 400,
"filters": {
"metadata.type": "api_method",
"prefer_path_prefixes": [
"docs/api/",
"docs/endpoints/",
"docs/methods/",
"api/",
"endpoints/",
"methods/"
],
"target_doc_hints": [],
"prefer_like_patterns": [
"%api%",
"%endpoint%",
"%method%",
"%эндпоинт%",
"%метод%"
]
}
}
```
## process.v2.pipeline
```json
{
"event": "retrieval_profile_selected",
"profile": "api_exposed",
"layers": [
"D1_DOCUMENT_CATALOG"
],
"filters": {
"metadata.type": "api_method",
"prefer_path_prefixes": [
"docs/api/",
"docs/endpoints/",
"docs/methods/",
"api/",
"endpoints/",
"methods/"
],
"target_doc_hints": [],
"prefer_like_patterns": [
"%api%",
"%endpoint%",
"%method%",
"%эндпоинт%",
"%метод%"
]
}
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_step_traced",
"workflow_id": "v2.docs_explain.api_exposed",
"step": {
"id": "resolve_retrieval_plan",
"title": "Выбор retrieval-плана"
},
"input": {},
"output": {
"profile": "api_exposed"
}
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_step_traced",
"workflow_id": "v2.docs_explain.api_exposed",
"step": {
"id": "fetch_rag_rows",
"title": "Получение строк из RAG"
},
"input": {},
"output": {
"retrieved_row_count": 3
}
}
```
## process.v2.evidence
```json
{
"event": "evidence_assembled",
"mode": "api_exposed",
"endpoint_count": 3,
"endpoints": [
"GET /api/v1/clients/contacts-dgr",
"GET /api/v1/clients/contacts-dgr/{contactid}",
"POST /api/v1/clients/contacts-dgr"
]
}
```
## process.v2.pipeline
```json
{
"event": "evidence_assembled",
"mode": "api_exposed",
"endpoint_count": 3
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_step_traced",
"workflow_id": "v2.docs_explain.api_exposed",
"step": {
"id": "build_api_exposed_evidence",
"title": "Сборка списка API"
},
"input": {},
"output": {
"endpoint_count": 3
}
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_step_traced",
"workflow_id": "v2.docs_explain.api_exposed",
"step": {
"id": "finalize_api_exposed_answer",
"title": "Формирование ответа со списком API"
},
"input": {},
"output": {
"answer_length": 111
}
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_trace_flushed",
"workflow_id": "v2.docs_explain.api_exposed",
"steps": [
{
"step_id": "require_rag_session",
"title": "Проверка RAG-сессии",
"input": {},
"output": {
"has_rag_session": true
}
},
{
"step_id": "resolve_retrieval_plan",
"title": "Выбор retrieval-плана",
"input": {},
"output": {
"profile": "api_exposed"
}
},
{
"step_id": "fetch_rag_rows",
"title": "Получение строк из RAG",
"input": {},
"output": {
"retrieved_row_count": 3
}
},
{
"step_id": "build_api_exposed_evidence",
"title": "Сборка списка API",
"input": {},
"output": {
"endpoint_count": 3
}
},
{
"step_id": "finalize_api_exposed_answer",
"title": "Формирование ответа со списком API",
"input": {},
"output": {
"answer_length": 111
}
}
]
}
```
## workflow.v2.api_exposed
```json
{
"event": "workflow_completed",
"workflow_id": "v2.docs_explain.api_exposed"
}
```
## process.v2.pipeline
```json
{
"event": "answer_generated",
"answer_mode": "deterministic",
"answer_length": 111
}
```
## result
```json
{
"status": "done",
"answer": "GET /api/v1/clients/contacts-dgr\nGET /api/v1/clients/contacts-dgr/{contactid}\nPOST /api/v1/clients/contacts-dgr",
"completed_at": "2026-04-10T13:06:13.326341+00:00"
}
```
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Binary file not shown.
@@ -53,6 +53,18 @@ class _EndpointPathExtractor:
_PATH_RE = re.compile(r"`([^`]+)`|(/[A-Za-z0-9_./{}-]+)")
_VALID_ENDPOINT_RE = re.compile(r"^/[a-z0-9._/-]+(?:/\{[a-z0-9_]+\})?$")
_DOC_EXTENSIONS = (".md", ".yaml", ".yml", ".json")
_FILESYSTEM_PREFIXES = (
"/users/",
"/home/",
"/tmp/",
"/var/",
"/opt/",
"/etc/",
"/private/",
"/mnt/",
"/workspace/",
"/workspaces/",
)
def extract(self, query: str) -> list[str]:
values: list[str] = []
@@ -72,6 +84,8 @@ class _EndpointPathExtractor:
def _is_endpoint(self, token: str) -> bool:
if not token or not self._VALID_ENDPOINT_RE.fullmatch(token):
return False
if token.startswith(self._FILESYSTEM_PREFIXES):
return False
return not token.endswith(self._DOC_EXTENSIONS)
def _append_unique(self, items: list[str], value: str) -> None:
@@ -2,8 +2,10 @@
from __future__ import annotations
import re
from collections.abc import Callable
from dataclasses import replace
from typing import TYPE_CHECKING
from app.core.agent.processes.v2.intent_router.modules.anchors import V2AnchorExtractor
from app.core.agent.processes.v2.intent_router.modules.normalizer import V2QueryNormalizer
@@ -16,13 +18,73 @@ from app.core.agent.processes.v2.intent_router.modules.scope_resolver import (
from app.core.agent.processes.v2.intent_router.modules.target_terms import V2TargetTermsExtractor
from app.core.agent.processes.v2.intent_router.models import QueryFeatures
from app.core.agent.processes.v2.intent_router.routers.confidence import V2ConfidenceAdjuster
from app.core.agent.processes.v2.intent_router.routers.docs_subintent_resolver import DocsSubintentResolver
from app.core.agent.processes.v2.intent_router.routers.fallback import V2FallbackRouter
from app.core.agent.processes.v2.intent_router.routers.llm import V2LlmRouter
from app.core.agent.processes.v2.intent_router.routers.route_catalog import V2RouteCatalog
from app.core.agent.processes.v2.intent_router.routers.validator import V2RouteValidator
from app.core.agent.utils.process_v2.models import V2RouteResult, V2ScopeType
from app.core.agent.utils.process_v2.models import V2RouteResult, V2ScopeType, V2Subintent
from app.core.agent.utils.llm import AgentLlmService
from app.core.rag.persistence.query_repository import RagQueryRepository
if TYPE_CHECKING:
from app.core.rag.persistence.query_repository import RagQueryRepository
class _ExplicitDocsUpdateResolver:
_UPDATE_MARKERS = (
"собери документац",
"сгенерир",
"построй документац",
"обнови документац",
"обновить документац",
"generate documentation",
"build documentation",
"update documentation",
)
_FEATURE_MARKERS = (
"/features/",
"\\features\\",
"feature",
"системной аналитик",
"confluence",
)
_PATH_PATTERN = re.compile(r"(/[^\n`]+?\.md)")
_URL_PATTERN = re.compile(r"https?://[^\s)]*confluence[^\s)]*")
def matches(self, user_query: str) -> bool:
query = str(user_query or "")
lowered = query.lower()
if not any(marker in lowered for marker in self._UPDATE_MARKERS):
return False
path = self._extract_path(query)
if path and self._is_feature_source(path):
return True
url = self._extract_confluence_url(query)
if url:
return True
return any(marker in lowered for marker in self._FEATURE_MARKERS)
def _extract_path(self, query: str) -> str:
if "`" in query:
for chunk in query.split("`"):
value = chunk.strip().strip('"').strip("'")
if value.endswith(".md") and value.startswith("/"):
return value
match = self._PATH_PATTERN.search(query)
return match.group(1).strip().strip('"').strip("'") if match else ""
def _extract_confluence_url(self, query: str) -> str:
match = self._URL_PATTERN.search(query)
return match.group(0).strip() if match else ""
def _is_feature_source(self, path: str) -> bool:
lowered = str(path or "").lower()
return "/feature" in lowered
class _ExplicitFileLookupResolver:
def matches(self, anchor_analysis) -> bool:
return bool(getattr(anchor_analysis.anchors, "file_names", []))
def _scope_candidate_dict(candidate) -> dict[str, object]:
@@ -52,10 +114,13 @@ class V2IntentRouter:
self._catalog = route_catalog or V2RouteCatalog()
self._validator = V2RouteValidator(self._catalog)
self._fallback_router = V2FallbackRouter()
self._docs_subintent_resolver = DocsSubintentResolver()
self._confidence_adjuster = confidence_adjuster or V2ConfidenceAdjuster()
self._enable_llm_disambiguation = enable_llm_disambiguation
self._llm_router = V2LlmRouter(llm, catalog=self._catalog) if llm is not None else None
self._scope_rows_provider = scope_rows_provider
self._explicit_docs_update_resolver = _ExplicitDocsUpdateResolver()
self._explicit_file_lookup_resolver = _ExplicitFileLookupResolver()
def route(self, user_query: str, *, rag_session_id: str | None = None) -> V2RouteResult:
normalized_query = self._normalizer.normalize(user_query)
@@ -98,6 +163,51 @@ class V2IntentRouter:
endpoint_markers=list(anchor_analysis.endpoint_markers),
scope_type=resolution.scope_type,
)
if self._explicit_docs_update_resolver.matches(user_query):
return V2RouteResult(
routing_domain="DOCS",
intent="DOC_UPDATE",
subintent="FROM_FEATURE",
user_query=user_query,
normalized_query=features.normalized_query,
target_terms=features.target_terms,
anchors=anchor_analysis.anchors,
confidence=1.0,
routing_mode="deterministic",
llm_router_used=False,
reason_short="explicit docs update from feature source",
scope_type=resolution.scope_type,
)
if self._explicit_file_lookup_resolver.matches(anchor_analysis):
return V2RouteResult(
routing_domain="DOCS",
intent="DOC_EXPLAIN",
subintent="FIND_FILES",
user_query=user_query,
normalized_query=features.normalized_query,
target_terms=features.target_terms,
anchors=anchor_analysis.anchors,
confidence=1.0,
routing_mode="deterministic",
llm_router_used=False,
reason_short="explicit file reference",
scope_type=resolution.scope_type,
)
if self._docs_subintent_resolver.resolve(features) == V2Subintent.OPENAPI_GENERATE:
return V2RouteResult(
routing_domain="DOCS",
intent="DOC_EXPLAIN",
subintent=V2Subintent.OPENAPI_GENERATE,
user_query=user_query,
normalized_query=features.normalized_query,
target_terms=features.target_terms,
anchors=anchor_analysis.anchors,
confidence=1.0,
routing_mode="deterministic",
llm_router_used=False,
reason_short="explicit openapi generation request",
scope_type=resolution.scope_type,
)
llm_attempted = self._enable_llm_disambiguation and self._llm_router is not None
llm_candidate = self._route_with_llm(
features=features,
@@ -121,11 +231,12 @@ class V2IntentRouter:
scope_type=resolution.scope_type,
)
if llm_attempted:
return self._fallback_router.route_without_deterministic_signals(
return self._fallback_router.route(
user_query=user_query,
features=features,
anchors=anchor_analysis.anchors,
scope_type=resolution.scope_type,
llm_attempted=True,
)
return self._fallback_router.route(
user_query=user_query,
@@ -142,10 +253,15 @@ class V2IntentRouter:
if self._scope_rows_provider is not None:
return self._scope_rows_provider(sid)
try:
return RagQueryRepository().list_docs_scope_index_rows(sid)
return self._build_query_repository().list_docs_scope_index_rows(sid)
except Exception:
return []
def _build_query_repository(self) -> "RagQueryRepository":
from app.core.rag.persistence.query_repository import RagQueryRepository
return RagQueryRepository()
def _apply_scope_to_anchors(self, anchors, resolution) -> None:
anchors.candidate_domains = list(resolution.candidate_domains)
anchors.candidate_subdomains = list(resolution.candidate_subdomains)
@@ -5,6 +5,16 @@ from app.core.agent.utils.process_v2.models import V2Subintent
class DocsSubintentResolver:
_OPENAPI_MARKERS = (
"openapi",
"swagger",
"спецификац",
"спека",
"contract yaml",
"api yaml",
)
_GENERATE_MARKERS = ("сгенерируй", "построй", "собери", "generate", "build", "show")
_FORMAT_MARKERS = ("yaml", "json", "xml")
_API_ENUM_MARKERS = (
"какие api",
"какие эндпоинты",
@@ -24,7 +34,11 @@ class DocsSubintentResolver:
_LIST_WORD_MARKERS = ("какие", "список", "перечисли", "все", "доступные", "list", "available", "exposed")
def resolve(self, features: QueryFeatures) -> str | None:
if features.file_markers or self._has_file_like_anchor(features):
if features.file_markers:
return V2Subintent.FIND_FILES
if self._is_openapi_request(features):
return V2Subintent.OPENAPI_GENERATE
if self._has_file_like_anchor(features):
return V2Subintent.FIND_FILES
if self._is_api_exposed_request(features):
return V2Subintent.API_EXPOSED
@@ -47,6 +61,15 @@ class DocsSubintentResolver:
for hint in features.target_doc_hints
) or any(token.endswith((".md", ".yaml", ".yml", ".json")) for token in features.file_names)
def _is_openapi_request(self, features: QueryFeatures) -> bool:
query = features.normalized_query.lower()
if any(marker in query for marker in self._OPENAPI_MARKERS):
return True
has_api_words = any(marker in query for marker in self._API_WORD_MARKERS)
has_generate_words = any(marker in query for marker in self._GENERATE_MARKERS)
has_format_words = any(marker in query for marker in self._FORMAT_MARKERS)
return has_api_words and has_generate_words and has_format_words
def _is_api_exposed_request(self, features: QueryFeatures) -> bool:
query = features.normalized_query.lower()
if features.endpoint_paths:
@@ -62,6 +62,16 @@ class V2FallbackRouter:
reason_short="fallback docs update from feature",
scope_type=scope_type,
)
if self._has_openapi_signal(features):
return self._build_docs_result(
user_query=user_query,
features=features,
anchors=anchors,
subintent=V2Subintent.OPENAPI_GENERATE,
llm_attempted=llm_attempted,
reason="fallback docs openapi",
scope_type=scope_type,
)
if self._has_api_exposed_signal(features):
return self._build_docs_result(
user_query=user_query,
@@ -136,6 +146,14 @@ class V2FallbackRouter:
)
)
def _has_openapi_signal(self, features: QueryFeatures) -> bool:
query = features.normalized_query.lower()
has_spec = any(marker in query for marker in ("openapi", "swagger", "спецификац", "спека"))
has_format = any(marker in query for marker in ("yaml", "json", "xml"))
has_generate = any(marker in query for marker in ("сгенерируй", "построй", "собери", "generate", "build"))
has_api = any(marker in query for marker in ("api", "эндпоинт", "endpoint", "роут", "route", "метод"))
return has_spec or (has_api and has_generate and has_format)
def _has_api_exposed_signal(self, features: QueryFeatures) -> bool:
query = features.normalized_query.lower()
has_api = any(marker in query for marker in ("api", "эндпоинт", "endpoint", "роут", "route", "метод"))
@@ -7,6 +7,7 @@ prompts:
Основной принцип:
- DOCS / DOC_EXPLAIN / FIND_FILES: запрос просит найти файл, документ или путь.
- DOCS / DOC_EXPLAIN / API_EXPOSED: запрос просит перечислить доступные API-методы/эндпоинты.
- DOCS / DOC_EXPLAIN / OPENAPI_GENERATE: запрос просит собрать OpenAPI/Swagger спецификацию по API-методам.
- DOCS / DOC_EXPLAIN / SUMMARY: запрос просит объяснить документацию, endpoint, архитектуру, процесс или сущность.
- DOCS / DOC_UPDATE / FROM_FEATURE: запрос просит обновить документацию по системной аналитике (feature markdown/confluence).
- GENERAL / GENERAL_QA / SUMMARY: общий обзорный вопрос без явного запроса к документации.
@@ -21,7 +22,7 @@ prompts:
{
"routing_domain": "GENERAL" | "DOCS",
"intent": "GENERAL_QA" | "DOC_EXPLAIN" | "DOC_UPDATE",
"subintent": "SUMMARY" | "FIND_FILES" | "API_EXPOSED" | "FROM_FEATURE",
"subintent": "SUMMARY" | "FIND_FILES" | "API_EXPOSED" | "OPENAPI_GENERATE" | "FROM_FEATURE",
"confidence": 0.0-1.0,
"reason_short": "короткая причина"
}
@@ -7,6 +7,7 @@ class V2RouteCatalog:
_ALLOWED_ROUTES = (
(V2Domain.DOCS, V2Intent.DOC_EXPLAIN, V2Subintent.FIND_FILES),
(V2Domain.DOCS, V2Intent.DOC_EXPLAIN, V2Subintent.API_EXPOSED),
(V2Domain.DOCS, V2Intent.DOC_EXPLAIN, V2Subintent.OPENAPI_GENERATE),
(V2Domain.DOCS, V2Intent.DOC_EXPLAIN, V2Subintent.SUMMARY),
(V2Domain.DOCS, V2Intent.DOC_UPDATE, V2Subintent.FROM_FEATURE),
(V2Domain.GENERAL, V2Intent.GENERAL_QA, V2Subintent.SUMMARY),
@@ -0,0 +1,65 @@
from __future__ import annotations
from dataclasses import dataclass
from app.core.agent.utils.process_v2.models import V2Intent, V2RouteResult, V2Subintent
@dataclass(frozen=True, slots=True)
class RouterStatusEvent:
text: str
payload: dict[str, object]
class V2RouterStatusBuilder:
def build(self, route: V2RouteResult) -> RouterStatusEvent:
subintent_label = self._subintent_label(route.intent, route.subintent)
subintent_comment = self._subintent_comment(route.intent, route.subintent)
return RouterStatusEvent(
text=f"Выбран subintent {route.subintent} - {subintent_comment}",
payload={
"routing_domain": route.routing_domain,
"intent": route.intent,
"subintent": route.subintent,
"subintent_label": subintent_label,
"subintent_comment": subintent_comment,
"confidence": route.confidence,
"routing_mode": route.routing_mode,
"llm_router_used": route.llm_router_used,
"reason_short": route.reason_short,
},
)
def _subintent_label(self, intent: str, subintent: str) -> str:
labels = {
(V2Intent.DOC_EXPLAIN, V2Subintent.SUMMARY): "объяснение документации",
(V2Intent.DOC_EXPLAIN, V2Subintent.FIND_FILES): "поиск файлов документации",
(V2Intent.DOC_EXPLAIN, V2Subintent.API_EXPOSED): "поиск API-методов",
(V2Intent.DOC_EXPLAIN, V2Subintent.OPENAPI_GENERATE): "генерация OpenAPI-спецификации",
(V2Intent.DOC_UPDATE, V2Subintent.FROM_FEATURE): "обновление документации по аналитике",
(V2Intent.GENERAL_QA, V2Subintent.SUMMARY): "общий ответ по проекту",
}
return labels.get((intent, subintent), str(subintent).lower())
def _subintent_comment(self, intent: str, subintent: str) -> str:
comments = {
(V2Intent.DOC_EXPLAIN, V2Subintent.SUMMARY): (
"отвечаю на вопрос по существующей документации с опорой на найденные документы"
),
(V2Intent.DOC_EXPLAIN, V2Subintent.FIND_FILES): (
"ищу конкретные файлы документации, которые соответствуют запросу"
),
(V2Intent.DOC_EXPLAIN, V2Subintent.API_EXPOSED): (
"проверяю, опубликован ли API наружу и через какие контракты или endpoint'ы он доступен"
),
(V2Intent.DOC_EXPLAIN, V2Subintent.OPENAPI_GENERATE): (
"собираю OpenAPI-спецификацию по существующей документации API"
),
(V2Intent.DOC_UPDATE, V2Subintent.FROM_FEATURE): (
"обновляю или создаю документацию по системной аналитике"
),
(V2Intent.GENERAL_QA, V2Subintent.SUMMARY): (
"даю общий ответ по проекту, когда запрос не сводится к конкретному документу"
),
}
return comments.get((intent, subintent), "выполняю сценарий, который лучше всего соответствует запросу")
@@ -1,9 +0,0 @@
# DOC_UPDATE/FROM_FEATURE v2 Rules
Этот каталог содержит общие rules для всех шагов и подпроцессов workflow `doc_update_from_feature_v2`.
- `attribute_resolution.md` — правила определения type/id/application/platform.
- `path_resolution.md` — правила резолва путей документации.
- `section_frontmatter.md` — инструкции для frontmatter.
- `section_summary.md` — инструкции для summary.
- `section_details.md` — инструкции для details.
@@ -1,5 +0,0 @@
# Attribute Resolution
1. Приоритет: теги из requirement > metadata документа > LLM fallback.
2. Обязательные атрибуты: `type`, `id`, `application`, `platform`.
3. Если атрибут отсутствует, разрешен fallback через LLM.
@@ -1,7 +0,0 @@
# Path Resolution
Путь строится как:
`docs/<application>/<platform>/<type>/<id>.md`
Нормализация сегментов: lowercase + замена недопустимых символов на `-`.
@@ -1,3 +0,0 @@
# Details Rules
Details содержит детализированное описание поведения, ограничений и сценариев.
@@ -1,4 +0,0 @@
# Frontmatter Rules
1. Frontmatter всегда в блоке `---`.
2. Должны быть поля id/title/type/application/platform.
@@ -1,3 +0,0 @@
# Summary Rules
Summary содержит краткую цель страницы и основные изменения.
+40 -1
View File
@@ -2,16 +2,22 @@
from __future__ import annotations
import asyncio
from typing import Any
from app.core.agent.processes.base import AgentProcess, ProcessResult
from app.core.agent.processes.v2.intent_router import V2IntentRouter
from app.core.agent.processes.v2.router_status import V2RouterStatusBuilder
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.workflow_runtime.context import (
DocExplainApiExposedContext,
)
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.graph import DocExplainApiExposedGraph
from app.core.agent.processes.v2.workflows.doc_explain_find_files.workflow_runtime.context import DocExplainFindFilesContext
from app.core.agent.processes.v2.workflows.doc_explain_find_files.graph import DocExplainFindFilesGraph
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context import (
DocGenerateOpenApiContext,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.graph import DocGenerateOpenApiGraph
from app.core.agent.processes.v2.workflows.doc_explain_summary.workflow_runtime.context import DocExplainSummaryContext
from app.core.agent.processes.v2.workflows.doc_explain_summary.graph import DocExplainSummaryGraph
from app.core.agent.processes.v2.workflows.doc_update_from_feature.graph import DocUpdateFromFeatureGraph
@@ -51,6 +57,7 @@ class V2Process(AgentProcess):
doc_update_workflow_version: str = "v2",
) -> None:
self._router = router or V2IntentRouter()
self._router_status_builder = V2RouterStatusBuilder()
gate = evidence_gate or DocsEvidenceGate()
self._docs_summary_prompt_name = docs_summary_prompt_name
self._general_summary_prompt_name = general_summary_prompt_name
@@ -76,6 +83,11 @@ class V2Process(AgentProcess):
policy_resolver=policy_resolver,
rag_adapter=rag_adapter,
),
(V2Domain.DOCS, V2Intent.DOC_EXPLAIN, V2Subintent.OPENAPI_GENERATE): DocGenerateOpenApiGraph(
llm,
policy_resolver=policy_resolver,
rag_adapter=rag_adapter,
),
(V2Domain.DOCS, V2Intent.DOC_UPDATE, V2Subintent.FROM_FEATURE): doc_update_graph,
(V2Domain.GENERAL, V2Intent.GENERAL_QA, V2Subintent.SUMMARY): GeneralQaSummaryGraph(
llm,
@@ -88,7 +100,25 @@ class V2Process(AgentProcess):
async def run(self, context) -> ProcessResult:
rag_session_id = context.session.active_rag_session_id or ""
route = self._router.route(context.request.message, rag_session_id=rag_session_id or None)
route = await asyncio.to_thread(
self._router.route,
context.request.message,
rag_session_id=rag_session_id or None,
)
router_status = self._router_status_builder.build(route)
context.request.set_route(
routing_domain=route.routing_domain,
intent=route.intent,
subintent=route.subintent,
subintent_label=str(router_status.payload.get("subintent_label") or route.subintent),
subintent_comment=str(router_status.payload.get("subintent_comment") or ""),
)
await context.publisher.publish_status(
context.request.request_id,
"process.v2",
router_status.text,
dict(router_status.payload),
)
context.trace.module("process.v2").log(
"intent_routed",
{
@@ -178,6 +208,15 @@ class V2Process(AgentProcess):
rag_session_id=rag_session_id,
)
)
if route.subintent == V2Subintent.OPENAPI_GENERATE:
return await workflow.run(
DocGenerateOpenApiContext(
runtime=runtime_context,
route=route,
rag_session_id=rag_session_id,
workflow_llm_enabled=self._workflow_llm_enabled,
)
)
if route.intent == V2Intent.DOC_UPDATE and route.subintent == V2Subintent.FROM_FEATURE:
if self._doc_update_workflow_version == "legacy":
return await workflow.run(
@@ -17,15 +17,16 @@ class DocExplainApiExposedWorkflowGraph(WorkflowGraph[TContext]):
steps_buffer: list[dict[str, object]] = []
for step in self._steps:
inp = step.trace_input(context)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
out = step.trace_output(next_context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
context = await step.run(context)
out = step.trace_output(context)
trace.log(
"workflow_step_traced",
{
@@ -36,7 +37,7 @@ class DocExplainApiExposedWorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer})
trace.log("workflow_completed", {"workflow_id": self._workflow_id})
return context
@@ -19,15 +19,16 @@ class DocExplainFindFilesWorkflowGraph(WorkflowGraph[TContext]):
steps_buffer: list[dict[str, object]] = []
for step in self._steps:
inp = step.trace_input(context)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
out = step.trace_output(next_context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
context = await step.run(context)
out = step.trace_output(context)
trace.log(
"workflow_step_traced",
{
@@ -38,6 +39,7 @@ class DocExplainFindFilesWorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log(
"workflow_trace_flushed",
{"workflow_id": self._workflow_id, "steps": steps_buffer},
@@ -19,15 +19,16 @@ class DocExplainSummaryWorkflowGraph(WorkflowGraph[TContext]):
steps_buffer: list[dict[str, object]] = []
for step in self._steps:
inp = step.trace_input(context)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
out = step.trace_output(next_context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
context = await step.run(context)
out = step.trace_output(context)
trace.log(
"workflow_step_traced",
{
@@ -38,6 +39,7 @@ class DocExplainSummaryWorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log(
"workflow_trace_flushed",
{"workflow_id": self._workflow_id, "steps": steps_buffer},
@@ -0,0 +1,11 @@
# DOC_EXPLAIN / OPENAPI_GENERATE Workflow
## LLM Instructions
Если в workflow будет подключён LLM-шаг для дообогащения OpenAPI-спецификации, придерживайся правил:
- Поля OpenAPI `summary` и `description` должны быть короткими.
- Каждое поле должно содержать ровно одно предложение.
- В каждом поле должно быть не более 10 слов.
- Не добавляй списки, поясняющие абзацы и несколько предложений в `summary` или `description`.
- Если исходная документация длинная, сожми смысл до короткой формулировки.
@@ -0,0 +1 @@
"""Workflow for DOC_EXPLAIN / OPENAPI_GENERATE."""
@@ -0,0 +1,73 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.steps.fetch_rag_rows_step import FetchRagRowsStep
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.steps.require_rag_session_step import (
RequireRagSessionStep,
)
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.steps.resolve_retrieval_plan_step import (
ResolveRetrievalPlanStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.build_openapi_operations_step import (
BuildOpenApiOperationsStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.enrich_openapi_from_contract_step import (
EnrichOpenApiFromContractStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.fetch_contract_sections_step import (
FetchContractSectionsStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.finalize_openapi_answer_step import (
FinalizeOpenApiAnswerStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.generate_openapi_yaml_step import (
GenerateOpenApiYamlStep,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_contract_llm_enricher import (
OpenApiContractLlmEnricher,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_contract_parser import (
OpenApiContractParser,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.retrieval.openapi_operation_collector import (
OpenApiOperationCollector,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_yaml_renderer import (
OpenApiYamlRenderer,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.buffered_graph import (
DocGenerateOpenApiWorkflowGraph,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context import (
DocGenerateOpenApiContext,
)
from app.core.agent.utils.llm import AgentLlmService
from app.core.agent.utils.process_v2.plan_resolver import RetrievalPlanResolver
from app.core.agent.utils.process_v2.rag_retrieval import V2RagRetrievalAdapter
class DocGenerateOpenApiGraph(DocGenerateOpenApiWorkflowGraph[DocGenerateOpenApiContext]):
def __init__(
self,
llm: AgentLlmService,
policy_resolver: RetrievalPlanResolver,
rag_adapter: V2RagRetrievalAdapter,
) -> None:
super().__init__(
workflow_id="v2.docs_explain.openapi_generate",
source="workflow.v2.openapi_generate",
steps=[
RequireRagSessionStep(
missing_message="Для генерации OpenAPI нужна активная RAG-сессия проекта с проиндексированной документацией."
),
ResolveRetrievalPlanStep(policy_resolver),
FetchRagRowsStep(rag_adapter),
BuildOpenApiOperationsStep(OpenApiOperationCollector()),
FetchContractSectionsStep(rag_adapter),
EnrichOpenApiFromContractStep(
parser=OpenApiContractParser(),
llm_enricher=OpenApiContractLlmEnricher(llm),
),
GenerateOpenApiYamlStep(OpenApiYamlRenderer()),
FinalizeOpenApiAnswerStep(),
],
)
@@ -0,0 +1,35 @@
namespace: v2_docs_openapi_generate
prompts:
generate_spec: |
Ты собираешь OpenAPI-спецификацию только по найденной документации проекта.
Правила для полей OpenAPI:
- `summary` и `description` должны быть короткими.
- Каждое поле должно содержать одно предложение.
- Каждое поле должно содержать не более 10 слов.
- Не пиши в этих полях списки, markdown и длинные пояснения.
Если исходный текст длинный, сожми его до короткой точной формулировки.
enrich_operation: |
Ты дообогащаешь одну OpenAPI-операцию только по контракту из документа типа `api_method`.
Важно:
- Используй только факты из переданного контракта.
- Если поля нет в контракте, оставь его пустым или не заполняй.
- Поля `summary` и `description` должны содержать одно предложение и не более 10 слов.
- Верни только JSON без markdown.
Верни объект формата:
{
"summary": "string",
"description": "string",
"parameters": [],
"request_schema": {},
"response_schema": {},
"responses": {
"200": {"description": "string"}
},
"security": []
}
@@ -0,0 +1 @@
"""Steps for DOC_EXPLAIN/OPENAPI_GENERATE workflow."""
@@ -0,0 +1,34 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.workflow_runtime.pipeline_logging import log_pipeline_step
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.retrieval.openapi_operation_collector import (
OpenApiOperationCollector,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context_protocols import (
OpenApiWorkflowContext,
)
from app.core.agent.utils.workflow import WorkflowStep
class BuildOpenApiOperationsStep(WorkflowStep[OpenApiWorkflowContext]):
step_id = "build_openapi_operations"
title = "Сборка API-операций"
def __init__(self, collector: OpenApiOperationCollector) -> None:
self._collector = collector
async def run(self, context: OpenApiWorkflowContext) -> OpenApiWorkflowContext:
if context.answer:
return context
context.operations = self._collector.collect(context.retrieved_rows)
payload = {
"mode": "openapi_generate",
"operation_count": len(context.operations),
"path_count": len({item.path for item in context.operations}),
}
context.runtime.trace.module("process.v2.evidence").log("evidence_assembled", payload)
log_pipeline_step(context.runtime, "evidence_assembled", payload)
return context
def trace_output(self, context: OpenApiWorkflowContext) -> dict[str, object]:
return {"operation_count": len(context.operations)}
@@ -0,0 +1,74 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.workflow_runtime.pipeline_logging import log_pipeline_step
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_contract_llm_enricher import (
OpenApiContractLlmEnricher,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_contract_parser import (
OpenApiContractParser,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context import DocGenerateOpenApiContext
from app.core.agent.utils.workflow import WorkflowStep
class EnrichOpenApiFromContractStep(WorkflowStep[DocGenerateOpenApiContext]):
step_id = "enrich_openapi_from_contract"
title = "Обогащение OpenAPI по контракту"
def __init__(self, parser: OpenApiContractParser, llm_enricher: OpenApiContractLlmEnricher) -> None:
self._parser = parser
self._llm_enricher = llm_enricher
async def run(self, context: DocGenerateOpenApiContext) -> DocGenerateOpenApiContext:
if context.answer or not context.operations:
return context
grouped = _group_contract_rows(context.contract_rows)
enriched: list = []
request_id = context.runtime.request.request_id
trace = context.runtime.trace.module("workflow.v2.openapi_generate.llm")
for operation in context.operations:
rows = grouped.get(operation.source_path, [])
operation.contract_markdown = _contract_markdown(rows)
operation = self._parser.apply(operation, rows)
if context.workflow_llm_enabled:
operation = await self._llm_enricher.enrich(
operation,
request_id=request_id,
user_query=context.route.user_query,
trace=trace,
)
enriched.append(operation)
context.operations = enriched
payload = {
"operation_count": len(context.operations),
"contract_row_count": len(context.contract_rows),
"workflow_llm_enabled": context.workflow_llm_enabled,
}
context.runtime.trace.module("process.v2.answer").log("openapi_contract_enriched", payload)
log_pipeline_step(context.runtime, "openapi_contract_enriched", payload)
return context
def trace_output(self, context: DocGenerateOpenApiContext) -> dict[str, object]:
return {"operation_count": len(context.operations)}
def _group_contract_rows(rows: list[dict]) -> dict[str, list[dict]]:
grouped: dict[str, list[dict]] = {}
for row in rows:
path = str(row.get("path") or "").strip()
if not path:
continue
grouped.setdefault(path, []).append(row)
return grouped
def _contract_markdown(rows: list[dict]) -> str:
blocks: list[str] = []
for row in rows:
metadata = dict(row.get("metadata") or {})
title = str(metadata.get("section_path") or metadata.get("section_title") or "").strip()
content = str(row.get("content") or "").strip()
if not content:
continue
blocks.append(f"## {title}\n{content}")
return "\n\n".join(blocks)
@@ -0,0 +1,43 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.workflow_runtime.pipeline_logging import log_pipeline_step
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context_protocols import (
OpenApiWorkflowContext,
)
from app.core.agent.utils.process_v2.rag_retrieval import V2RagRetrievalAdapter
from app.core.agent.utils.workflow import WorkflowStep
from app.core.rag.contracts.enums import RagLayer
class FetchContractSectionsStep(WorkflowStep[OpenApiWorkflowContext]):
step_id = "fetch_contract_sections"
title = "Получение contract-секций"
def __init__(self, rag_adapter: V2RagRetrievalAdapter) -> None:
self._rag_adapter = rag_adapter
async def run(self, context: OpenApiWorkflowContext) -> OpenApiWorkflowContext:
if context.answer or not context.operations:
return context
paths = list(dict.fromkeys(item.source_path for item in context.operations if item.source_path))
if not paths:
return context
rows = await self._rag_adapter.fetch_exact_paths(
context.rag_session_id,
paths=paths,
layers=[RagLayer.DOCS_DOC_CHUNKS],
)
context.contract_rows = [row for row in rows if _is_contract_row(row)]
payload = {"contract_row_count": len(context.contract_rows), "path_count": len(paths)}
context.runtime.trace.module("process.v2.evidence").log("contract_rows_loaded", payload)
log_pipeline_step(context.runtime, "contract_rows_loaded", payload)
return context
def trace_output(self, context: OpenApiWorkflowContext) -> dict[str, object]:
return {"contract_row_count": len(context.contract_rows)}
def _is_contract_row(row: dict) -> bool:
metadata = dict(row.get("metadata") or {})
section_path = str(metadata.get("section_path") or "").lower()
return "контракт" in section_path or "contract" in section_path
@@ -0,0 +1,37 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context import DocGenerateOpenApiContext
from app.core.agent.utils.workflow import WorkflowStep
class FinalizeOpenApiAnswerStep(WorkflowStep[DocGenerateOpenApiContext]):
step_id = "finalize_openapi_answer"
title = "Формирование OpenAPI-ответа"
async def run(self, context: DocGenerateOpenApiContext) -> DocGenerateOpenApiContext:
if context.answer:
return context
if not context.operations:
context.answer = "Не нашёл API-методов в выбранном scope, из которых можно собрать OpenAPI-спецификацию."
context.answer_generated_payload = {
"answer_mode": "insufficient_evidence",
"answer_length": len(context.answer),
}
return context
if not context.openapi_yaml.strip():
context.answer = "Не удалось собрать YAML OpenAPI по найденным данным."
context.answer_generated_payload = {
"answer_mode": "insufficient_evidence",
"answer_length": len(context.answer),
}
return context
context.answer = f"```yaml\n{context.openapi_yaml.rstrip()}\n```"
context.answer_generated_payload = {
"answer_mode": "openapi_yaml",
"answer_length": len(context.answer),
"operation_count": len(context.operations),
}
return context
def trace_output(self, context: DocGenerateOpenApiContext) -> dict[str, object]:
return {"answer_length": len(context.answer)}
@@ -0,0 +1,32 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.workflow_runtime.pipeline_logging import log_pipeline_step
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.openapi_yaml_renderer import OpenApiYamlRenderer
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.context_protocols import (
OpenApiWorkflowContext,
)
from app.core.agent.utils.workflow import WorkflowStep
class GenerateOpenApiYamlStep(WorkflowStep[OpenApiWorkflowContext]):
step_id = "generate_openapi_yaml"
title = "Сборка YAML-спецификации"
def __init__(self, renderer: OpenApiYamlRenderer) -> None:
self._renderer = renderer
async def run(self, context: OpenApiWorkflowContext) -> OpenApiWorkflowContext:
if context.answer or not context.operations:
return context
context.openapi_yaml = self._renderer.render(context.route, context.operations)
payload = {
"mode": "openapi_generate",
"yaml_chars": len(context.openapi_yaml),
"operation_count": len(context.operations),
}
context.runtime.trace.module("process.v2.answer").log("openapi_yaml_generated", payload)
log_pipeline_step(context.runtime, "answer_generated", payload)
return context
def trace_output(self, context: OpenApiWorkflowContext) -> dict[str, object]:
return {"yaml_chars": len(context.openapi_yaml)}
@@ -0,0 +1,75 @@
from __future__ import annotations
import asyncio
import json
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
from app.core.agent.utils.llm import AgentLlmService
class OpenApiContractLlmEnricher:
def __init__(self, llm: AgentLlmService, prompt_name: str = "v2_docs_openapi_generate.enrich_operation") -> None:
self._llm = llm
self._prompt_name = prompt_name
async def enrich(self, operation: OpenApiOperation, *, request_id: str, user_query: str, trace) -> OpenApiOperation:
if not operation.contract_markdown.strip():
return operation
prompt_input = self._build_prompt_input(operation, user_query)
raw = await asyncio.to_thread(
self._llm.generate,
self._prompt_name,
prompt_input,
log_context=f"agent:{request_id}",
trace=trace,
)
payload = self._parse_json(raw)
if payload is None:
return operation
return self._merge(operation, payload)
def _build_prompt_input(self, operation: OpenApiOperation, user_query: str) -> str:
base_payload = {
"method": operation.method,
"path": operation.path,
"summary": operation.summary,
"description": operation.description,
"parameters": operation.parameters,
"request_schema": operation.request_schema,
"response_schema": operation.response_schema,
"responses": operation.responses,
}
return (
f"Запрос пользователя:\n{user_query}\n\n"
f"Текущая заготовка операции:\n{json.dumps(base_payload, ensure_ascii=False, indent=2)}\n\n"
f"Контракт из документа api_method:\n{operation.contract_markdown}"
)
def _parse_json(self, raw: str) -> dict[str, object] | None:
text = str(raw or "").strip()
if text.startswith("```"):
text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip()
try:
value = json.loads(text)
except json.JSONDecodeError:
return None
return value if isinstance(value, dict) else None
def _merge(self, operation: OpenApiOperation, payload: dict[str, object]) -> OpenApiOperation:
for field in ("summary", "description"):
value = str(payload.get(field) or "").strip()
if value:
setattr(operation, field, value)
if isinstance(payload.get("parameters"), list):
operation.parameters = [dict(item) for item in payload["parameters"] if isinstance(item, dict)]
if isinstance(payload.get("request_schema"), dict):
operation.request_schema = dict(payload["request_schema"])
if isinstance(payload.get("response_schema"), dict):
operation.response_schema = dict(payload["response_schema"])
if isinstance(payload.get("responses"), dict):
operation.responses = {str(k): dict(v) for k, v in payload["responses"].items() if isinstance(v, dict)}
if isinstance(payload.get("security"), list):
operation.security = [dict(item) for item in payload["security"] if isinstance(item, dict)]
if str(payload.get("response_status") or "").strip():
operation.response_status = str(payload["response_status"]).strip()
return operation
@@ -0,0 +1,175 @@
from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
class OpenApiContractParser:
_BODY_MARKERS = ("body", "json", "request body", "тело", "body json")
_QUERY_MARKERS = ("query", "querystring", "строка запроса")
_HEADER_MARKERS = ("header", "заголов", "headers")
_COOKIE_MARKERS = ("cookie", "cookies")
def apply(self, operation: OpenApiOperation, rows: list[dict]) -> OpenApiOperation:
request_properties: dict[str, object] = {}
request_required: list[str] = []
response_properties: dict[str, object] = {}
response_required: list[str] = []
responses = dict(operation.responses)
for row in rows:
metadata = dict(row.get("metadata") or {})
title = str(metadata.get("section_title") or "").strip().lower()
table = _parse_markdown_table(str(row.get("content") or ""))
if "входные параметры" in title:
for item in table:
self._apply_input_row(operation, item, request_properties, request_required)
elif "выходные параметры" in title:
for item in table:
self._apply_output_row(item, response_properties, response_required)
elif "ошиб" in title:
self._apply_error_rows(responses, table)
elif "метаданные вызова" in title:
self._apply_call_metadata(operation, str(row.get("content") or ""))
if request_properties:
operation.request_schema = _object_schema(request_properties, request_required)
if response_properties:
operation.response_schema = _object_schema(response_properties, response_required)
if responses:
operation.responses = responses
return operation
def _apply_input_row(
self,
operation: OpenApiOperation,
item: dict[str, str],
request_properties: dict[str, object],
request_required: list[str],
) -> None:
name = item.get("параметр") or item.get("field") or item.get("name") or item.get("поле") or ""
place = item.get("где передается") or item.get("where") or item.get("in") or ""
if not name:
return
schema = _schema_from_row(item)
if _is_body_place(place):
request_properties[name] = schema
if _is_required(item):
request_required.append(name)
return
parameter = {
"name": name,
"in": _parameter_place(place, path=operation.path),
"required": _is_required(item) or ("{" + name + "}") in operation.path,
"schema": schema,
}
description = item.get("описание") or item.get("description") or ""
if description:
parameter["description"] = description
operation.parameters.append(parameter)
def _apply_output_row(
self,
item: dict[str, str],
response_properties: dict[str, object],
response_required: list[str],
) -> None:
name = item.get("поле") or item.get("field") or item.get("name") or ""
if not name:
return
response_properties[name] = _schema_from_row(item)
if _is_required(item):
response_required.append(name)
def _apply_error_rows(self, responses: dict[str, dict[str, object]], table: list[dict[str, str]]) -> None:
for item in table:
status = item.get("status") or item.get("http") or item.get("код") or ""
error = item.get("error") or item.get("code") or item.get("ошибка") or ""
if not status:
continue
responses[str(status)] = {"description": error or f"HTTP {status}"}
def _apply_call_metadata(self, operation: OpenApiOperation, content: str) -> None:
lowered = content.lower()
if "auth:" not in lowered:
return
auth_value = lowered.split("auth:", 1)[1].splitlines()[0].strip()
if auth_value and auth_value not in {"false", "none", "public", "no"}:
operation.security = [{"bearerAuth": []}]
def _parse_markdown_table(content: str) -> list[dict[str, str]]:
lines = [line.strip() for line in str(content or "").splitlines() if line.strip()]
if len(lines) < 3 or "|" not in lines[0]:
return []
headers = [_normalize_header(part) for part in lines[0].strip("|").split("|")]
items: list[dict[str, str]] = []
for row in lines[2:]:
if "|" not in row:
continue
values = [part.strip() for part in row.strip("|").split("|")]
if len(values) != len(headers):
continue
items.append(dict(zip(headers, values)))
return items
def _normalize_header(value: str) -> str:
return str(value or "").strip().lower()
def _schema_from_row(item: dict[str, str]) -> dict[str, object]:
schema_type = _map_type(item.get("тип") or item.get("type") or "")
schema: dict[str, object] = {"type": schema_type}
example = item.get("пример") or item.get("example") or ""
description = item.get("описание") or item.get("description") or ""
if description:
schema["description"] = description
if example:
schema["example"] = example
return schema
def _map_type(raw: str) -> str:
value = str(raw or "").strip().lower()
if value in {"int", "integer", "number", "long"}:
return "integer"
if value in {"decimal", "float", "double"}:
return "number"
if value in {"bool", "boolean"}:
return "boolean"
if value in {"array", "list"}:
return "array"
if value in {"object", "json"}:
return "object"
return "string"
def _object_schema(properties: dict[str, object], required: list[str]) -> dict[str, object]:
payload: dict[str, object] = {"type": "object", "properties": properties}
required_values = [item for item in dict.fromkeys(required) if item in properties]
if required_values:
payload["required"] = required_values
return payload
def _is_required(item: dict[str, str]) -> bool:
value = str(item.get("обязательность") or item.get("required") or "").strip().lower()
return value in {"yes", "true", "required", "да", "mandatory", "обязательно"}
def _is_body_place(place: str) -> bool:
lowered = str(place or "").strip().lower()
return any(marker in lowered for marker in OpenApiContractParser._BODY_MARKERS)
def _parameter_place(place: str, *, path: str) -> str:
lowered = str(place or "").strip().lower()
if any(marker in lowered for marker in OpenApiContractParser._QUERY_MARKERS):
return "query"
if any(marker in lowered for marker in OpenApiContractParser._HEADER_MARKERS):
return "header"
if any(marker in lowered for marker in OpenApiContractParser._COOKIE_MARKERS):
return "cookie"
if any(part.startswith("{") and part.endswith("}") for part in path.split("/")):
return "path"
return "query"
@@ -0,0 +1,129 @@
from __future__ import annotations
import yaml
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
from app.core.agent.utils.process_v2.models import V2RouteResult
class OpenApiYamlRenderer:
_METHOD_ORDER = ("get", "post", "put", "patch", "delete", "head", "options")
def render(self, route: V2RouteResult, operations: list[OpenApiOperation]) -> str:
document = {
"openapi": "3.0.3",
"info": {
"title": self._title(route),
"version": "1.0.0",
},
"tags": self._tags(operations),
"paths": self._paths(operations),
}
return yaml.safe_dump(document, sort_keys=False, allow_unicode=True)
def _title(self, route: V2RouteResult) -> str:
if route.anchors.process_subdomain and route.anchors.process_domain:
return f"OpenAPI for {route.anchors.process_domain}/{route.anchors.process_subdomain}"
if route.anchors.process_domain:
return f"OpenAPI for {route.anchors.process_domain}"
return "Project API"
def _tags(self, operations: list[OpenApiOperation]) -> list[dict[str, str]]:
values = sorted({tag for item in operations for tag in item.tags if tag and tag != "default"})
return [{"name": tag} for tag in values]
def _paths(self, operations: list[OpenApiOperation]) -> dict[str, dict[str, object]]:
grouped: dict[str, dict[str, object]] = {}
for operation in sorted(operations, key=self._sort_key):
methods = grouped.setdefault(operation.path, {})
methods[operation.method.lower()] = self._operation_payload(operation)
return grouped
def _sort_key(self, operation: OpenApiOperation) -> tuple[str, int]:
try:
method_index = self._METHOD_ORDER.index(operation.method.lower())
except ValueError:
method_index = len(self._METHOD_ORDER)
return operation.path, method_index
def _operation_payload(self, operation: OpenApiOperation) -> dict[str, object]:
summary = self._short_sentence(operation.summary, fallback=f"{operation.method} {operation.path}")
description = self._short_sentence(operation.description, fallback=summary)
responses = operation.responses or {
operation.response_status: self._response_payload(summary, operation),
}
responses = self._merge_primary_response_schema(responses, operation)
payload: dict[str, object] = {
"summary": summary,
"description": description,
"operationId": self._operation_id(operation),
"responses": responses,
}
if operation.tags and operation.tags != ["default"]:
payload["tags"] = list(operation.tags)
if operation.parameters:
payload["parameters"] = list(operation.parameters)
if operation.security:
payload["security"] = list(operation.security)
if operation.request_schema is not None:
payload["requestBody"] = {
"required": True,
"content": {
"application/json": {
"schema": operation.request_schema,
}
},
}
return payload
def _operation_id(self, operation: OpenApiOperation) -> str:
slug = operation.path.strip("/").replace("/", "_").replace("{", "").replace("}", "") or "root"
return f"{operation.method.lower()}_{slug}"
def _response_payload(self, summary: str, operation: OpenApiOperation) -> dict[str, object]:
payload: dict[str, object] = {"description": summary or "Successful response"}
if operation.response_schema is not None:
payload["content"] = {
"application/json": {
"schema": operation.response_schema,
}
}
return payload
def _merge_primary_response_schema(
self,
responses: dict[str, dict[str, object]],
operation: OpenApiOperation,
) -> dict[str, dict[str, object]]:
if operation.response_schema is None:
return responses
primary = dict(responses.get(operation.response_status) or {})
if "content" not in primary:
primary["content"] = {
"application/json": {
"schema": operation.response_schema,
}
}
if "description" not in primary:
primary["description"] = operation.summary or f"HTTP {operation.response_status}"
merged = dict(responses)
merged[operation.response_status] = primary
return merged
def _short_sentence(self, text: str, *, fallback: str) -> str:
raw = str(text or "").strip() or fallback
first_sentence = self._first_sentence(raw)
words = [word for word in first_sentence.split() if word]
shortened = " ".join(words[:10]).strip()
return shortened or fallback
def _first_sentence(self, text: str) -> str:
value = str(text or "").replace("\n", " ").strip()
if not value:
return ""
for marker in (".", "!", "?", ";"):
idx = value.find(marker)
if idx > 0:
value = value[:idx]
break
return value.strip(" -:,")
@@ -0,0 +1 @@
"""Retrieval helpers for DOC_EXPLAIN/OPENAPI_GENERATE workflow."""
@@ -0,0 +1,123 @@
from __future__ import annotations
import re
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
class OpenApiOperationCollector:
_METHODS = ("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS")
_METHOD_PATH_RE = re.compile(r"\b(GET|POST|PUT|PATCH|DELETE|HEAD|OPTIONS)\s+(/[-a-zA-Z0-9_./{}]+)")
def collect(self, rows: list[dict]) -> list[OpenApiOperation]:
operations: dict[tuple[str, str], OpenApiOperation] = {}
for row in rows:
operation = self._operation_from_row(row)
if operation is None:
continue
key = (operation.method, operation.path)
current = operations.get(key)
operations[key] = operation if current is None else self._merge(current, operation)
return [operations[key] for key in sorted(operations)]
def _operation_from_row(self, row: dict) -> OpenApiOperation | None:
metadata = dict(row.get("metadata") or {})
method, path = self._resolve_method_path(row, metadata)
if not method or not path:
return None
summary = self._summary(row, metadata, method, path)
description = self._description(row, metadata, summary)
tags = self._tags(metadata)
return OpenApiOperation(
method=method,
path=path,
summary=summary,
description=description,
tags=tags,
request_schema=self._schema_dict(metadata.get("request_schema")),
response_schema=self._schema_dict(metadata.get("response_schema")),
parameters=self._parameters(path, metadata),
response_status=str(metadata.get("response_status") or metadata.get("status_code") or "200"),
source_path=str(row.get("path") or ""),
)
def _resolve_method_path(self, row: dict, metadata: dict[str, object]) -> tuple[str, str]:
method = str(metadata.get("http_method") or metadata.get("method") or "").strip().upper()
endpoint_texts = [metadata.get("endpoint"), row.get("title"), row.get("content")]
path = ""
for value in endpoint_texts:
parsed_method, parsed_path = self._extract_method_path(value)
if parsed_method and not method:
method = parsed_method
if parsed_path:
path = parsed_path
break
if not method and path:
method = "GET"
return method, path
def _extract_method_path(self, value: object) -> tuple[str, str]:
text = str(value or "").strip()
if not text:
return "", ""
match = self._METHOD_PATH_RE.search(text)
if match:
return match.group(1).upper(), match.group(2).strip().lower()
return "", text.lower() if text.startswith("/") else ""
def _summary(self, row: dict, metadata: dict[str, object], method: str, path: str) -> str:
raw = str(metadata.get("summary_text") or metadata.get("summary") or row.get("title") or "").strip()
cleaned = self._METHOD_PATH_RE.sub("", raw).strip(" -:\n\t")
return cleaned or f"{method} {path}"
def _description(self, row: dict, metadata: dict[str, object], summary: str) -> str:
raw = str(metadata.get("description") or row.get("content") or "").strip()
return raw or summary
def _tags(self, metadata: dict[str, object]) -> list[str]:
domain = str(metadata.get("domain") or "").strip()
subdomain = str(metadata.get("subdomain") or "").strip()
if domain and subdomain:
return [f"{domain}/{subdomain}"]
if domain:
return [domain]
return ["default"]
def _schema_dict(self, value: object) -> dict[str, object] | None:
return dict(value) if isinstance(value, dict) and value else None
def _parameters(self, path: str, metadata: dict[str, object]) -> list[dict[str, object]]:
explicit = self._explicit_parameters(metadata.get("parameters"))
if explicit:
return explicit
return [self._path_parameter(name) for name in self._path_param_names(path)]
def _explicit_parameters(self, value: object) -> list[dict[str, object]]:
if not isinstance(value, list):
return []
return [dict(item) for item in value if isinstance(item, dict)]
def _path_param_names(self, path: str) -> list[str]:
return re.findall(r"{([^{}]+)}", path)
def _path_parameter(self, name: str) -> dict[str, object]:
return {
"name": name,
"in": "path",
"required": True,
"schema": {"type": "string"},
}
def _merge(self, current: OpenApiOperation, candidate: OpenApiOperation) -> OpenApiOperation:
current.summary = current.summary if len(current.summary) >= len(candidate.summary) else candidate.summary
current.description = (
current.description if len(current.description) >= len(candidate.description) else candidate.description
)
current.tags = list(dict.fromkeys([*current.tags, *candidate.tags]))
current.parameters = current.parameters or candidate.parameters
current.request_schema = current.request_schema or candidate.request_schema
current.response_schema = current.response_schema or candidate.response_schema
if current.response_status == "200" and candidate.response_status != "200":
current.response_status = candidate.response_status
current.source_path = current.source_path or candidate.source_path
return current
@@ -0,0 +1,75 @@
from __future__ import annotations
from app.core.agent.utils.process_v2.models import V2Intent, V2RouteResult, V2Subintent
from app.core.rag.contracts.enums import RagLayer
from app.core.rag.retrieval.session_retriever import RetrievalPlan
class DocGenerateOpenApiRetrievalPolicy:
_LAYERS = [RagLayer.DOCS_DOCUMENT_CATALOG]
_API_PREFIXES = ["docs/api/", "docs/endpoints/", "docs/methods/", "api/", "endpoints/", "methods/"]
def supports(self, route: V2RouteResult) -> bool:
return route.intent == V2Intent.DOC_EXPLAIN and route.subintent == V2Subintent.OPENAPI_GENERATE
def resolve(self, route: V2RouteResult) -> RetrievalPlan:
return RetrievalPlan(
profile="openapi_generate",
layers=list(self._LAYERS),
limit=1000,
filters=self._filters(route),
)
def _filters(self, route: V2RouteResult) -> dict[str, object]:
filters: dict[str, object] = {
"metadata.type": "api_method",
"prefer_path_prefixes": list(self._API_PREFIXES),
"target_doc_hints": list(route.anchors.target_doc_hints),
"prefer_like_patterns": self._like_patterns(route),
}
query_signals = self._query_signals(route)
if query_signals:
filters["query_signals"] = query_signals
if route.anchors.process_domain:
filters["metadata.domain"] = route.anchors.process_domain
if route.anchors.process_subdomain:
filters["metadata.subdomain"] = route.anchors.process_subdomain
return filters
def _like_patterns(self, route: V2RouteResult) -> list[str]:
raw = list(route.target_terms)
raw.extend(route.anchors.endpoint_paths)
raw.extend(route.anchors.target_doc_hints)
raw.extend(candidate.value for candidate in route.anchors.candidate_apis)
raw.extend(candidate.value for candidate in route.anchors.candidate_domains)
raw.extend(candidate.value for candidate in route.anchors.candidate_subdomains)
return [f"%{item.lower()}%" for item in _unique(raw)]
def _query_signals(self, route: V2RouteResult) -> list[str]:
raw = list(route.target_terms)
raw.extend(route.anchors.endpoint_paths)
blocked = {
"openapi",
"swagger",
"yaml",
"json",
"xml",
"api",
"endpoint",
"method",
"эндпоинт",
"метод",
}
return [item for item in _unique(raw) if item.lower() not in blocked]
def _unique(items: list[str]) -> list[str]:
out: list[str] = []
seen: set[str] = set()
for item in items:
value = str(item or "").strip()
if not value or value in seen:
continue
seen.add(value)
out.append(value)
return out
@@ -0,0 +1 @@
"""Runtime helpers for the DOC_EXPLAIN/OPENAPI_GENERATE workflow."""
@@ -0,0 +1,43 @@
"""Buffered graph for DOC_EXPLAIN/OPENAPI_GENERATE workflow."""
from __future__ import annotations
from typing import TypeVar
from app.core.agent.utils.workflow.context import WorkflowContext
from app.core.agent.utils.workflow.graph import WorkflowGraph
TContext = TypeVar("TContext", bound=WorkflowContext)
class DocGenerateOpenApiWorkflowGraph(WorkflowGraph[TContext]):
async def run(self, context: TContext) -> TContext:
trace = context.runtime.trace.module(self._source)
trace.log("workflow_started", {"workflow_id": self._workflow_id})
steps_buffer: list[dict[str, object]] = []
for step in self._steps:
inp = step.trace_input(context)
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
out = step.trace_output(next_context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
trace.log(
"workflow_step_traced",
{
"workflow_id": self._workflow_id,
"step": {"id": step.step_id, "title": step.title},
"input": inp,
"output": out,
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer})
trace.log("workflow_completed", {"workflow_id": self._workflow_id})
return context
@@ -0,0 +1,23 @@
from __future__ import annotations
from dataclasses import dataclass, field
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
from app.core.agent.runtime.execution_context import RuntimeExecutionContext
from app.core.agent.utils.process_v2.models import V2RouteResult
from app.core.rag.retrieval.session_retriever import RetrievalPlan
@dataclass(slots=True)
class DocGenerateOpenApiContext:
runtime: RuntimeExecutionContext
route: V2RouteResult
rag_session_id: str
workflow_llm_enabled: bool = True
retrieval_plan: RetrievalPlan | None = None
retrieved_rows: list[dict] = field(default_factory=list)
contract_rows: list[dict] = field(default_factory=list)
operations: list[OpenApiOperation] = field(default_factory=list)
openapi_yaml: str = ""
answer: str = ""
answer_generated_payload: dict[str, object] | None = None
@@ -0,0 +1,26 @@
"""Context protocols for the DOC_EXPLAIN/OPENAPI_GENERATE workflow."""
from __future__ import annotations
from typing import Protocol
from app.core.agent.processes.v2.workflows.doc_generate_openapi.workflow_runtime.models import OpenApiOperation
from app.core.agent.runtime.execution_context import RuntimeExecutionContext
from app.core.agent.utils.process_v2.models import V2RouteResult
from app.core.rag.retrieval.session_retriever import RetrievalPlan
class RetrievalWorkflowContext(Protocol):
runtime: RuntimeExecutionContext
route: V2RouteResult
rag_session_id: str
retrieval_plan: RetrievalPlan | None
retrieved_rows: list[dict]
answer: str
answer_generated_payload: dict[str, object] | None
class OpenApiWorkflowContext(RetrievalWorkflowContext, Protocol):
contract_rows: list[dict]
operations: list[OpenApiOperation]
openapi_yaml: str
@@ -0,0 +1,20 @@
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(slots=True)
class OpenApiOperation:
method: str
path: str
summary: str
description: str
tags: list[str] = field(default_factory=list)
request_schema: dict[str, object] | None = None
response_schema: dict[str, object] | None = None
parameters: list[dict[str, object]] = field(default_factory=list)
response_status: str = "200"
responses: dict[str, dict[str, object]] = field(default_factory=dict)
security: list[dict[str, list[str]]] = field(default_factory=list)
contract_markdown: str = ""
source_path: str = ""
@@ -19,17 +19,18 @@ class DocUpdateFromFeatureWorkflowGraph(WorkflowGraph[TContext]):
before = self._snapshot(context)
raw_inp = step.trace_input(context)
inp = self._merge_trace_payload(raw_inp, before)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
)
context = await step.run(context)
after = self._snapshot(context)
raw_out = step.trace_output(context)
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
after = self._snapshot(next_context)
raw_out = step.trace_output(next_context)
out = self._merge_trace_payload(raw_out, after)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
trace.log(
"workflow_step_traced",
{
@@ -40,6 +41,7 @@ class DocUpdateFromFeatureWorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer})
trace.log("workflow_completed", {"workflow_id": self._workflow_id})
return context
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
@@ -30,7 +31,7 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
context.issues.append(f"Файл системной аналитики не найден: {context.source_ref}")
return context
try:
context.source_content = source_path.read_text(encoding="utf-8")
context.source_content = await asyncio.to_thread(source_path.read_text, encoding="utf-8")
context.project_root = self._resolve_project_root(source_path).as_posix()
except Exception as exc:
context.issues.append(f"Не удалось прочитать системную аналитику: {exc}")
@@ -42,6 +43,9 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
idx = parts.index("_incoming")
if idx > 0:
return Path(*parts[:idx])
for parent in source_path.parents:
if (parent / "docs").is_dir():
return parent
return source_path.parent
def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]:
@@ -52,3 +56,6 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
"project_root": context.project_root,
"source_content_len": len(context.source_content or ""),
}
def get_after_status_message(self):
return "Системная аналитика загружена"
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from typing import Any
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step3_parse_requirements.parser import (
@@ -21,7 +22,7 @@ class ParseRequirementsStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context:
if context.answer or not context.source_content:
return context
meta, units = self._parser.parse(context.source_content)
meta, units = await asyncio.to_thread(self._parser.parse, context.source_content)
context.analytics_meta = meta
context.requirements = units
@@ -54,3 +55,6 @@ class ParseRequirementsStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
for item in context.requirements
],
}
def get_after_status_message(self):
return "Функциональные требования прочитаны"
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
@@ -23,7 +24,16 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
if not self._rules_root.exists():
context.issues.append(f"Папка rules не найдена: {self._rules_root.as_posix()}")
return context
loaded, issues = await asyncio.to_thread(self._load_rules)
context.issues.extend(issues)
context.rules = loaded
if not context.rules:
context.issues.append("Rules v2 пустые: не найдено ни одного *.md файла.")
return context
def _load_rules(self) -> tuple[list[RuleDocument], list[str]]:
loaded: list[RuleDocument] = []
issues: list[str] = []
for item in sorted(self._rules_root.rglob("*.md")):
try:
loaded.append(
@@ -33,11 +43,8 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
)
)
except Exception as exc:
context.issues.append(f"Не удалось прочитать rule {item.name}: {exc}")
context.rules = loaded
if not context.rules:
context.issues.append("Rules v2 пустые: не найдено ни одного *.md файла.")
return context
issues.append(f"Не удалось прочитать rule {item.name}: {exc}")
return loaded, issues
def _discover_rules_root(self) -> Path:
current = Path(__file__).resolve()
@@ -55,3 +62,6 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
"rules_count": len(context.rules),
"rule_names": [item.name for item in context.rules],
}
def get_after_status_message(self):
return "Загружены правила документации v3"
@@ -2,6 +2,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.docs_state_loader import DocsState
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step5_execute_subprocesses.path_resolver import (
DocsPathResolver,
)
@@ -50,6 +51,7 @@ class RequirementTaskBuilder:
def build(self, context: DocUpdateFromFeatureV2Context) -> list[RequirementTaskContext]:
known_paths = {str(row.get("path") or "") for row in context.docs_catalog_rows}
docs_state = DocsState.from_rows(context.docs_catalog_rows)
tasks: list[RequirementTaskContext] = []
for index, requirement in enumerate(context.requirements):
task = RequirementTaskContext(
@@ -60,7 +62,8 @@ class RequirementTaskBuilder:
metadata=dict(requirement.metadata),
)
self._attribute_resolver.resolve(context, task, rules_text="")
task.path = self._path_resolver.resolve(
self._resolve_target(task, docs_state)
task.path = task.target_path or self._path_resolver.resolve(
application=task.application,
platform=task.platform,
doc_type=task.doc_type,
@@ -72,6 +75,18 @@ class RequirementTaskBuilder:
tasks.append(task)
return tasks
def _resolve_target(self, task: RequirementTaskContext, docs_state: DocsState) -> None:
target_path = str(task.metadata.get("target_path") or task.metadata.get("path") or "").strip()
if target_path:
task.target_path = target_path
target_doc_id = str(task.metadata.get("target_doc_id") or task.metadata.get("target_id") or "").strip()
if not target_doc_id and str(task.metadata.get("action") or "").strip().lower() == "update":
target_doc_id = task.doc_id
if target_doc_id:
task.target_doc_id = target_doc_id
if not task.target_path:
task.target_path = docs_state.by_doc_id.get(target_doc_id, "")
class RequirementTaskOrderer:
_PLATFORM_PRIORITY = {"pprb": 0, "ufs": 1, "web": 2}
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from typing import Any
@@ -30,12 +31,15 @@ class PrepareRequirementTasksStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context:
if context.answer or not context.requirements:
return context
self._catalog_loader.load(context)
context.requirement_tasks = self._task_orderer.order(self._task_builder.build(context))
context.requirement_tasks = await asyncio.to_thread(self._build_tasks, context)
if not context.requirement_tasks:
context.issues.append("Не удалось подготовить задачи по разделу 6 аналитики.")
return context
def _build_tasks(self, context: DocUpdateFromFeatureV2Context):
self._catalog_loader.load(context)
return self._task_orderer.order(self._task_builder.build(context))
def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]:
return {
"requirements_count": len(context.requirements),
@@ -64,3 +68,6 @@ class PrepareRequirementTasksStep(WorkflowStep[DocUpdateFromFeatureV2Context]):
for item in context.requirement_tasks
],
}
def get_after_status_message(self):
return "Составялем план изменений"
@@ -0,0 +1,103 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_policy import (
DocTypePolicyRegistry,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.rules_catalog import (
RulesCatalog,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.workflow_runtime.models import (
RequirementTaskContext,
RuleDocument,
)
from app.core.agent.utils.llm import AgentLlmService
@dataclass(slots=True)
class ChangeAssessment:
decision: str
reason: str = ""
target_sections: list[str] = field(default_factory=list)
missing_points: list[str] = field(default_factory=list)
def needs_update(self) -> bool:
return self.decision == "needs_update"
class DocumentChangeNeedEvaluator:
def __init__(self, llm: AgentLlmService, policies: DocTypePolicyRegistry | None = None) -> None:
self._llm = llm
self._policies = policies or DocTypePolicyRegistry()
def assess(
self,
task: RequirementTaskContext,
*,
current_content: str,
rule_documents: list[RuleDocument],
shared_context: dict[str, object],
) -> ChangeAssessment:
catalog = RulesCatalog.from_documents(rule_documents)
policy = self._policies.load(catalog, task.doc_type)
template = policy.template
payload = {
"task": {
"section_key": task.section_key,
"heading": task.heading,
"doc_type": task.doc_type,
"doc_id": task.doc_id,
"path": task.path,
"domain": task.domain,
"sub_domain": task.subdomain,
"application": task.application,
"platform": task.platform,
"requirement_text": task.body,
},
"current_document": current_content,
"template": {
"doc_type": template.doc_type,
"source_name": template.source_name,
"template_text": template.template_text,
"sections": [
{"title": item.title, "level": item.level, "rule_path": item.rule_path}
for item in template.sections
],
},
"doc_type_policy": policy.prompt_payload(),
"global_rules": catalog.global_text(),
"shared_context": shared_context,
}
raw = self._llm.generate(
"v2_docs_update_v2.assess_existing_document_change",
json.dumps(payload, ensure_ascii=False, indent=2),
log_context="workflow.v2.docs_update.from_feature_v2.assess_change",
)
return self._parse(raw)
def _parse(self, raw: str) -> ChangeAssessment:
try:
parsed = json.loads(str(raw or "").strip())
except Exception:
return ChangeAssessment(decision="needs_update", reason="LLM assessment parse failed.")
if not isinstance(parsed, dict):
return ChangeAssessment(decision="needs_update", reason="LLM assessment returned non-object payload.")
decision = str(parsed.get("decision") or "").strip().lower()
if decision not in {"up_to_date", "needs_update"}:
return ChangeAssessment(decision="needs_update", reason="LLM assessment returned unknown decision.")
reason = str(parsed.get("reason") or "").strip()
target_sections = self._string_list(parsed.get("target_sections"))
missing_points = self._string_list(parsed.get("missing_points"))
return ChangeAssessment(
decision=decision,
reason=reason,
target_sections=target_sections,
missing_points=missing_points,
)
def _string_list(self, value: object) -> list[str]:
if not isinstance(value, list):
return []
return [str(item).strip() for item in value if str(item).strip()]
@@ -11,7 +11,25 @@ class DeleteIntentHeuristic:
re.compile(r"\bудал(?:ить|ение|яем|яется|ен[аоы]?|ить страницу|ить документ)\b"),
re.compile(r"\bдекомисс"),
)
_NEGATIVE_MARKERS = (
re.compile(r"\bотсутств"),
re.compile(r"\bнедоступ"),
re.compile(r"\bdisabled\b"),
re.compile(r"\bне\s+(?:нужно|требуется|должен|должна|должны|предусмотрен[аоы]?|отображается)\b"),
re.compile(r"\bбез возможности\b"),
)
_INLINE_CODE_RE = re.compile(r"`[^`]*`")
_FRAGMENT_SPLIT_RE = re.compile(r"[\n\r]+|(?<=[.!?])\s+")
def is_delete(self, text: str) -> bool:
lowered = (text or "").lower()
return any(pattern.search(lowered) for pattern in self._PATTERNS)
normalized = self._INLINE_CODE_RE.sub(" ", text or "").lower()
for fragment in self._iter_fragments(normalized):
if not any(pattern.search(fragment) for pattern in self._PATTERNS):
continue
if any(marker.search(fragment) for marker in self._NEGATIVE_MARKERS):
continue
return True
return False
def _iter_fragments(self, text: str) -> list[str]:
return [fragment.strip() for fragment in self._FRAGMENT_SPLIT_RE.split(text) if fragment.strip()]
@@ -2,12 +2,19 @@ from __future__ import annotations
import re
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import (
DocTypeNormalizer,
)
class DocsPathResolver:
def __init__(self, normalizer: DocTypeNormalizer | None = None) -> None:
self._normalizer = normalizer or DocTypeNormalizer()
def resolve(self, *, application: str, platform: str, doc_type: str, doc_id: str, domain: str = "") -> str:
root = self._clean(domain) or self._clean(application) or "common"
plat = self._clean(platform) or "web"
dtype = self._clean(doc_type) or "misc"
dtype = self._clean(self._normalizer.normalize(doc_type)) or "misc"
did = self._clean(doc_id) or "untitled"
return f"docs/{root}/{plat}/{dtype}/{did}.md"
@@ -3,6 +3,7 @@ namespace: v2_docs_update_v2
prompts:
resolve_attributes_fallback: |
Определи недостающие атрибуты страницы документации по секции аналитики и структуре docs catalog.
Используй только канонические значения `doc_type`: `api_method`, `logic_block`, `architecture_overview`, `db_table`, `ui_page`.
Верни только JSON-объект с полями: doc_type, id, application, platform, domain, sub_domain.
Не добавляй пояснений.
@@ -8,6 +8,9 @@ from pathlib import Path
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step5_execute_subprocesses.classifier import (
DeleteIntentHeuristic,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import (
DocTypeNormalizer,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.create_doc import (
CreateDocSubprocess,
)
@@ -30,9 +33,10 @@ from app.schemas.changeset import ChangeItem, ChangeOp
class TaskAttributeResolver:
def __init__(self, llm: AgentLlmService) -> None:
self._llm = llm
self._doc_type_normalizer = DocTypeNormalizer()
def resolve(self, context: DocUpdateFromFeatureV2Context, task: RequirementTaskContext, rules_text: str) -> None:
task.doc_type = str(task.metadata.get("doc_type") or task.metadata.get("type") or "").strip()
task.doc_type = self._normalize_doc_type(task.metadata.get("doc_type") or task.metadata.get("type") or "")
task.doc_id = str(task.metadata.get("id") or self._slug(task.heading)).strip()
task.application = str(task.metadata.get("application") or context.analytics_meta.application or "").strip()
task.platform = str(task.metadata.get("platform") or context.analytics_meta.platform or "").strip().lower()
@@ -67,7 +71,7 @@ class TaskAttributeResolver:
log_context="workflow.v2.docs_update.from_feature_v2.resolve_attributes",
)
parsed = self._json_or_empty(raw)
task.doc_type = task.doc_type or str(parsed.get("doc_type") or parsed.get("type") or "").strip()
task.doc_type = task.doc_type or self._normalize_doc_type(parsed.get("doc_type") or parsed.get("type") or "")
task.doc_id = task.doc_id or str(parsed.get("id") or self._slug(task.heading)).strip()
task.application = task.application or str(parsed.get("application") or "").strip()
task.platform = task.platform or str(parsed.get("platform") or "web").strip().lower()
@@ -77,6 +81,9 @@ class TaskAttributeResolver:
def _slug(self, value: str) -> str:
return re.sub(r"[^a-z0-9._-]+", "-", (value or "").strip().lower()).strip(".-") or "untitled"
def _normalize_doc_type(self, value: object) -> str:
return self._doc_type_normalizer.normalize(str(value or ""))
def _json_or_empty(self, raw: str) -> dict[str, object]:
value = str(raw or "").strip()
if not value:
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from typing import Any
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step5_execute_subprocesses.services import (
@@ -22,10 +23,20 @@ class ExecuteRequirementSubprocessesStep(WorkflowStep[DocUpdateFromFeatureV2Cont
async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context:
if context.answer or not context.requirement_tasks:
return context
for task in context.requirement_tasks:
self._change_executor.execute(context, task)
await asyncio.to_thread(self._execute_all, context)
return context
def _execute_all(self, context: DocUpdateFromFeatureV2Context) -> None:
for task in context.requirement_tasks:
try:
self._change_executor.execute(context, task)
except Exception as exc:
task.issues.append(str(exc))
context.issues.append(
f"Task failed for {task.section_key or task.doc_id or task.heading}: "
f"{task.path or task.target_path or '<path unresolved>'}: {exc}"
)
def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]:
return {
"tasks": [
@@ -57,3 +68,6 @@ class ExecuteRequirementSubprocessesStep(WorkflowStep[DocUpdateFromFeatureV2Cont
for item in context.accumulated_pages
],
}
def get_after_status_message(self):
return "Правки подготовлены"
@@ -0,0 +1,12 @@
from __future__ import annotations
class DocTypeNormalizer:
_ALIASES = {
"data_entity": "db_table",
"domain_entity": "db_table",
}
def normalize(self, doc_type: str) -> str:
value = str(doc_type or "").strip().lower()
return self._ALIASES.get(value, value)
@@ -0,0 +1,60 @@
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.rules_catalog import (
RulesCatalog,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_models import (
TemplateSpec,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_registry import (
TemplateRegistry,
)
@dataclass(slots=True)
class RuleSnippet:
path: str
text: str
@dataclass(slots=True)
class DocTypePolicy:
template: TemplateSpec
required_common_elements: list[RuleSnippet] = field(default_factory=list)
def prompt_payload(self) -> dict[str, object]:
return {
"doc_type": self.template.doc_type,
"template": {
"doc_type": self.template.doc_type,
"source_name": self.template.source_name,
"template_text": self.template.template_text,
"title_level": self.template.title_level,
"required_common_elements": list(self.template.required_common_elements),
"special_rules": list(self.template.special_rules),
"sections": [
{"title": item.title, "level": item.level, "rule_path": item.rule_path}
for item in self.template.sections
],
},
"required_common_element_rules": [
{"path": item.path, "text": item.text}
for item in self.required_common_elements
],
}
class DocTypePolicyRegistry:
def __init__(self, templates: TemplateRegistry | None = None) -> None:
self._templates = templates or TemplateRegistry()
def load(self, catalog: RulesCatalog, doc_type: str) -> DocTypePolicy:
template = self._templates.load(catalog, doc_type)
required_common_elements = [
RuleSnippet(path=path, text=catalog.rule_text(path))
for path in template.required_common_elements
]
return DocTypePolicy(template=template, required_common_elements=required_common_elements)
@@ -3,6 +3,9 @@ from __future__ import annotations
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.rules_catalog import (
RulesCatalog,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import (
DocTypeNormalizer,
)
from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_models import (
TemplateSpec,
)
@@ -12,10 +15,16 @@ from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocess
class TemplateRegistry:
def __init__(self, parser: TemplateParser | None = None) -> None:
def __init__(
self,
parser: TemplateParser | None = None,
normalizer: DocTypeNormalizer | None = None,
) -> None:
self._parser = parser or TemplateParser()
self._normalizer = normalizer or DocTypeNormalizer()
def load(self, catalog: RulesCatalog, doc_type: str) -> TemplateSpec:
doc_type = self._normalizer.normalize(doc_type)
template_name = f"templates/{doc_type}.template.md"
template_text = catalog.template_text(doc_type)
if not template_text.strip():
@@ -6,3 +6,5 @@ prompts:
Не добавляй другие секции и не добавляй пояснений.
Используй requirement_text, template, global_rules, frontmatter_rules и shared_context.
Обязательно заполни id, title, doc_type, status, domain, sub_domain, related_docs.
Для `doc_type: api_method` обязательно заполни поле `endpoint`.
Если endpoint встречается в аналитике, use case, интеграционной схеме, контракте или shared_context, перенеси его во frontmatter.
@@ -9,3 +9,6 @@ prompts:
Если для секции есть структура в rule_text, заполни ее полностью, без заглушек и без фраз вида "описано отдельно".
Если source_fragment содержит "Не выявлены", это не запрещает детализировать интеграционный FR из use case, contract и shared_context.
Не ссылайся на rule-файлы в тексте документа.
Никогда не повторяй заголовок target_section внутри ответа.
Не выводи заголовки того же уровня или выше, чем уровень target_section.
Для `### Контракт` запрещено выводить `## Контракт` и `### Контракт`; допускаются только подзаголовки глубже, например `#### Запрос` и `#### Ответ`.
@@ -7,3 +7,6 @@ prompts:
Строго следуй template.template_text, target_section.rule_text и global_rules.
Используй source_fragment как приоритетный источник фактов и сохраняй согласованность с current_content.
Не оставляй заглушки, само-ссылки и фразы вида "описано отдельно".
Никогда не повторяй заголовок target_section внутри ответа.
Не выводи заголовки того же уровня или выше, чем уровень target_section.
Для `### Контракт` запрещено выводить `## Контракт` и `### Контракт`; допускаются только подзаголовки глубже, например `#### Запрос` и `#### Ответ`.
@@ -17,17 +17,18 @@ class DocUpdateFromFeatureV2WorkflowGraph(WorkflowGraph[TContext]):
before = self._snapshot(context)
raw_inp = step.trace_input(context)
inp = self._merge_trace_payload(raw_inp, before)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
)
context = await step.run(context)
after = self._snapshot(context)
raw_out = step.trace_output(context)
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
after = self._snapshot(next_context)
raw_out = step.trace_output(next_context)
out = self._merge_trace_payload(raw_out, after)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
trace.log(
"workflow_step_traced",
{
@@ -38,6 +39,7 @@ class DocUpdateFromFeatureV2WorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer})
trace.log("workflow_completed", {"workflow_id": self._workflow_id})
return context
@@ -42,6 +42,8 @@ class RequirementTaskContext:
subdomain: str = ""
action: DocAction = DocAction.CREATE
path: str = ""
target_doc_id: str = ""
target_path: str = ""
issues: list[str] = field(default_factory=list)
@@ -19,15 +19,16 @@ class GeneralQaSummaryWorkflowGraph(WorkflowGraph[TContext]):
steps_buffer: list[dict[str, object]] = []
for step in self._steps:
inp = step.trace_input(context)
request_id = context.runtime.request.request_id
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
out = step.trace_output(next_context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
context = await step.run(context)
out = step.trace_output(context)
trace.log(
"workflow_step_traced",
{
@@ -38,6 +39,7 @@ class GeneralQaSummaryWorkflowGraph(WorkflowGraph[TContext]):
},
)
steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out})
context = next_context
trace.log(
"workflow_trace_flushed",
{"workflow_id": self._workflow_id, "steps": steps_buffer},
+54 -13
View File
@@ -11,6 +11,7 @@ from app.core.agent.runtime.execution_context import RuntimeExecutionContext
from app.core.agent.runtime.process_registry import ProcessRegistry
from app.core.agent.runtime.process_runner import ProcessRunner
from app.core.agent.runtime.publisher import RuntimeEventPublisher
from app.core.shared.gigachat.errors import GigaChatError
from app.infra.exceptions import AppError
from app.infra.observability.module_trace import RequestTraceContext
from app.infra.observability.request_trace_logger import RequestTraceLogger
@@ -53,7 +54,6 @@ class AgentRuntime:
publisher=self._publisher,
trace=RequestTraceContext(request_id=request.request_id, logger=self._trace_logger),
)
await self._announce_start(request.request_id, process.version)
result = await self._process_runner.run(context, process)
request.answer = result.answer
request.changeset = list(result.changeset)
@@ -86,21 +86,23 @@ class AgentRuntime:
self._request_store.save(request)
self._trace_logger.start_request(request, session)
async def _announce_start(self, request_id: str, process_version: str) -> None:
await self._safe_publish_status(request_id, "runtime", "Запрос принят и поставлен в обработку.")
await self._safe_publish_status(
request_id,
"runtime",
f"Запускаю процесс {process_version}.",
{"process_version": process_version},
)
async def _publish_result(self, request: AgentRequest) -> None:
route_payload = request.route.as_payload() if request.route is not None else None
try:
await self._publisher.publish_user(request.request_id, "agent", request.answer or "")
await self._publisher.publish_result(
request.request_id,
"agent",
request.answer or "",
{
"result_type": "changeset" if request.changeset else "answer",
"answer": request.answer or "",
"changeset": [item.model_dump(mode="json") for item in request.changeset],
"apply_changeset": request.apply_changeset,
"route": route_payload,
},
)
except Exception:
LOGGER.exception("failed to publish user event: request_id=%s", request.request_id)
await self._safe_publish_status(request.request_id, "runtime", "Обработка запроса завершена.")
LOGGER.exception("failed to publish result event: request_id=%s", request.request_id)
def _complete_request(self, request: AgentRequest, session: AgentSession) -> None:
session.append_turn(user_message=request.message, assistant_message=request.answer or "")
@@ -122,16 +124,55 @@ class AgentRuntime:
"Во время обработки запроса произошла ошибка.",
{"code": request.error.code},
)
try:
await self._publisher.publish_error(
request.request_id,
"runtime",
request.error.desc,
{
"error": request.error.model_dump(mode="json"),
"route": request.route.as_payload() if request.route is not None else None,
},
)
except Exception:
LOGGER.exception("failed to publish error event: request_id=%s", request.request_id)
def _build_error_payload(self, exc: Exception) -> ErrorPayload:
if isinstance(exc, AppError):
return ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module)
if isinstance(exc, GigaChatError):
return self._build_gigachat_error_payload(exc)
return ErrorPayload(
code="api_runtime_error",
desc="Agent request failed unexpectedly.",
module=ModuleName.AGENT,
)
def _build_gigachat_error_payload(self, exc: GigaChatError) -> ErrorPayload:
if exc.status_code == 402:
return ErrorPayload(
code="llm_payment_required",
desc="GigaChat недоступен: провайдер вернул 402 Payment Required. Проверьте баланс или настройки биллинга.",
module=ModuleName.AGENT,
)
if exc.status_code == 401:
return ErrorPayload(
code="llm_auth_failed",
desc="GigaChat недоступен: ошибка авторизации провайдера. Проверьте токен и настройки доступа.",
module=ModuleName.AGENT,
)
if exc.status_code == 429:
return ErrorPayload(
code="llm_rate_limited",
desc="GigaChat временно недоступен: превышен лимит запросов. Повторите попытку позже.",
module=ModuleName.AGENT,
)
return ErrorPayload(
code="llm_provider_error",
desc=str(exc) or "GigaChat request failed.",
module=ModuleName.AGENT,
)
async def _safe_publish_status(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None:
try:
await self._publisher.publish_status(request_id, source, text, payload)
+22
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import logging
from app.core.api.domain.events.client_event import ClientEventRecord
@@ -24,6 +25,7 @@ class RuntimeEventPublisher:
sorted(list((payload or {}).keys())),
)
await self._publish(request_id, ClientEventType.STATUS, source, text, payload)
await asyncio.sleep(0)
async def publish_user(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None:
LOGGER.warning(
@@ -35,6 +37,26 @@ class RuntimeEventPublisher:
)
await self._publish(request_id, ClientEventType.USER, source, text, payload)
async def publish_result(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None:
LOGGER.warning(
"publish result: request_id=%s source=%s text_len=%s payload_keys=%s",
request_id,
source,
len(text or ""),
sorted(list((payload or {}).keys())),
)
await self._publish(request_id, ClientEventType.RESULT, source, text, payload)
async def publish_error(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None:
LOGGER.warning(
"publish error: request_id=%s source=%s text=%s payload_keys=%s",
request_id,
source,
text,
sorted(list((payload or {}).keys())),
)
await self._publish(request_id, ClientEventType.ERROR, source, text, payload)
async def _publish(
self,
request_id: str,
@@ -20,6 +20,7 @@ class V2Subintent:
SUMMARY = "SUMMARY"
FIND_FILES = "FIND_FILES"
API_EXPOSED = "API_EXPOSED"
OPENAPI_GENERATE = "OPENAPI_GENERATE"
FROM_FEATURE = "FROM_FEATURE"
@@ -10,6 +10,9 @@ from app.core.agent.processes.v2.workflows.doc_explain_api_exposed.steps.retriev
from app.core.agent.processes.v2.workflows.doc_explain_find_files.steps.retrieval.retrieval_policy import (
DocExplainFindFilesRetrievalPolicy,
)
from app.core.agent.processes.v2.workflows.doc_generate_openapi.steps.retrieval.retrieval_policy import (
DocGenerateOpenApiRetrievalPolicy,
)
from app.core.agent.processes.v2.workflows.doc_explain_summary.steps.retrieval.retrieval_policy import (
DocExplainSummaryRetrievalPolicy,
)
@@ -38,5 +41,6 @@ class V2RetrievalPolicyResolver:
GeneralQaSummaryRetrievalPolicy(),
DocExplainFindFilesRetrievalPolicy(),
DocExplainApiExposedRetrievalPolicy(),
DocGenerateOpenApiRetrievalPolicy(),
DocExplainSummaryRetrievalPolicy(),
)
+54 -10
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Generic, Sequence, TypeVar
from typing import Callable, Generic, Sequence, TypeVar
from app.core.agent.utils.workflow.context import WorkflowContext
from app.core.agent.utils.workflow.step import WorkflowStep
@@ -24,21 +24,65 @@ class WorkflowGraph(Generic[TContext]):
return context
async def _run_step(self, context: TContext, step: WorkflowStep[TContext]) -> TContext:
request_id = context.runtime.request.request_id
trace = context.runtime.trace.module(self._source)
trace.log(
"step_started",
{"workflow_id": self._workflow_id, "step_id": step.step_id, "input": step.trace_input(context)},
)
await context.runtime.publisher.publish_status(
request_id,
self._source,
f"Шаг workflow: {step.title}.",
{"workflow_id": self._workflow_id, "step_id": step.step_id},
await self._publish_step_status(context, step, phase="before", input_context=context)
next_context = await step.run(context)
await self._publish_step_status(
next_context,
step,
phase="after",
input_context=context,
output_context=next_context,
)
context = await step.run(context)
trace.log(
"step_completed",
{"workflow_id": self._workflow_id, "step_id": step.step_id, "output": step.trace_output(context)},
{"workflow_id": self._workflow_id, "step_id": step.step_id, "output": step.trace_output(next_context)},
)
return context
return next_context
async def _publish_step_status(
self,
runtime_context: TContext,
step: WorkflowStep[TContext],
*,
phase: str,
input_context: TContext,
output_context: TContext | None = None,
) -> None:
message = self._resolve_step_status_message(step, phase, input_context, output_context)
if not message:
return
await runtime_context.runtime.publisher.publish_status(
runtime_context.runtime.request.request_id,
self._source,
message,
{"workflow_id": self._workflow_id, "step_id": step.step_id, "phase": phase},
)
def _resolve_step_status_message(
self,
step: WorkflowStep[TContext],
phase: str,
input_context: TContext,
output_context: TContext | None,
) -> str | None:
builder: Callable[[], str | None]
active_context = input_context
if phase == "after":
builder = step.get_after_status_message
active_context = output_context or input_context
else:
builder = step.get_before_status_message
tokens = step._push_status_state(
active_context=active_context,
input_context=input_context,
output_context=output_context,
)
try:
return builder()
finally:
step._pop_status_state(tokens)
+42 -1
View File
@@ -1,10 +1,14 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from contextvars import ContextVar, Token
from typing import Any, Generic, TypeVar, cast
TContext = TypeVar("TContext")
_ACTIVE_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_active_context", default=None)
_INPUT_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_input_context", default=None)
_OUTPUT_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_output_context", default=None)
class WorkflowStep(ABC, Generic[TContext]):
@@ -15,8 +19,45 @@ class WorkflowStep(ABC, Generic[TContext]):
async def run(self, context: TContext) -> TContext:
raise NotImplementedError
@property
def context(self) -> TContext | None:
return cast(TContext | None, _ACTIVE_CONTEXT.get())
@property
def input_context(self) -> TContext | None:
return cast(TContext | None, _INPUT_CONTEXT.get())
@property
def output_context(self) -> TContext | None:
return cast(TContext | None, _OUTPUT_CONTEXT.get())
def get_before_status_message(self) -> str | None:
return None
def get_after_status_message(self) -> str | None:
return None
def trace_input(self, context: TContext) -> dict[str, Any]:
return {}
def trace_output(self, context: TContext) -> dict[str, Any]:
return {}
def _push_status_state(
self,
*,
active_context: TContext,
input_context: TContext,
output_context: TContext | None = None,
) -> tuple[Token[Any | None], Token[Any | None], Token[Any | None]]:
return (
_ACTIVE_CONTEXT.set(active_context),
_INPUT_CONTEXT.set(input_context),
_OUTPUT_CONTEXT.set(output_context),
)
def _pop_status_state(self, tokens: tuple[Token[Any | None], Token[Any | None], Token[Any | None]]) -> None:
active_token, input_token, output_token = tokens
_ACTIVE_CONTEXT.reset(active_token)
_INPUT_CONTEXT.reset(input_token)
_OUTPUT_CONTEXT.reset(output_token)
@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
import logging
from app.core.api.application.request_start_gate import RequestStartGate
from app.core.api.domain.models.agent_request import AgentRequest
from app.core.api.infrastructure.ids.request_id_factory import RequestIdFactory
from app.core.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
@@ -19,11 +20,13 @@ class RequestService:
request_ids: RequestIdFactory,
sessions: SessionService,
runtime: AgentRuntime,
start_gate: RequestStartGate | None = None,
) -> None:
self._request_store = request_store
self._request_ids = request_ids
self._sessions = sessions
self._runtime = runtime
self._start_gate = start_gate or RequestStartGate()
async def create(self, session_id: str, message: str, process_version: str) -> AgentRequest:
session = self._sessions.get(session_id)
@@ -41,13 +44,21 @@ class RequestService:
process_version,
(message or "").replace("\n", "\\n")[:500],
)
task = asyncio.create_task(self._runtime.run(request, session), name=f"agent-runtime:{request.request_id}")
self._start_gate.register(request.request_id)
task = asyncio.create_task(self._run_request(request, session), name=f"agent-runtime:{request.request_id}")
task.add_done_callback(self._log_task_result)
return request
def get(self, request_id: str) -> AgentRequest | None:
return self._request_store.get(request_id)
async def _run_request(self, request: AgentRequest, session) -> None:
try:
await self._start_gate.wait_until_ready(request.request_id)
await self._runtime.run(request, session)
finally:
self._start_gate.forget(request.request_id)
def _log_task_result(self, task: asyncio.Task) -> None:
try:
exc = task.exception()
@@ -0,0 +1,32 @@
from __future__ import annotations
import asyncio
from threading import Lock
class RequestStartGate:
def __init__(self, timeout_seconds: float = 0.5) -> None:
self._timeout_seconds = timeout_seconds
self._events: dict[str, asyncio.Event] = {}
self._lock = Lock()
def register(self, request_id: str) -> None:
with self._lock:
self._events.setdefault(request_id, asyncio.Event())
def mark_ready(self, request_id: str) -> None:
with self._lock:
event = self._events.setdefault(request_id, asyncio.Event())
event.set()
async def wait_until_ready(self, request_id: str) -> None:
with self._lock:
event = self._events.setdefault(request_id, asyncio.Event())
try:
await asyncio.wait_for(event.wait(), timeout=self._timeout_seconds)
except asyncio.TimeoutError:
return
def forget(self, request_id: str) -> None:
with self._lock:
self._events.pop(request_id, None)
+13 -2
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
from app.core.api.application.request_start_gate import RequestStartGate
from app.infra.exceptions import AppError
from app.core.api.infrastructure.streaming.sse_encoder import SseEncoder
from app.core.api.infrastructure.streaming.sse_event_channel import SseEventChannel
@@ -7,15 +8,25 @@ from app.schemas.common import ModuleName
class StreamService:
def __init__(self, channel: SseEventChannel, request_exists, encoder: SseEncoder | None = None) -> None:
def __init__(
self,
channel: SseEventChannel,
request_exists,
encoder: SseEncoder | None = None,
start_gate: RequestStartGate | None = None,
) -> None:
self._channel = channel
self._request_exists = request_exists
self._encoder = encoder or SseEncoder()
self._start_gate = start_gate
async def subscribe(self, request_id: str):
if not self._request_exists(request_id):
raise AppError("request_not_found", f"Agent request not found: {request_id}", ModuleName.BACKEND)
return await self._channel.subscribe(request_id, replay=True)
queue = await self._channel.subscribe(request_id, replay=True)
if self._start_gate is not None:
self._start_gate.mark_ready(request_id)
return queue
async def unsubscribe(self, request_id: str, queue) -> None:
await self._channel.unsubscribe(request_id, queue)
@@ -28,8 +28,11 @@ class RagPublicController:
rag_session_id=rag_session_id,
index_job_id=job.index_job_id,
status=job.status,
total_files=job.total_files,
indexed_files=job.indexed_files,
failed_files=job.failed_files,
skipped_files=job.skipped_files,
reuse_percent=job.reuse_percent,
cache_hit_files=job.cache_hit_files,
cache_miss_files=job.cache_miss_files,
error=job.error.model_dump(mode="json") if job.error else None,
@@ -2,7 +2,12 @@ from __future__ import annotations
from app.infra.exceptions import AppError
from app.core.api.application.request_service import RequestService
from app.schemas.agent_api import AgentRequestCreateRequest, AgentRequestQueuedResponse, AgentRequestStateResponse
from app.schemas.agent_api import (
AgentRequestCreateRequest,
AgentRequestQueuedResponse,
AgentRequestStateResponse,
RouteSelectionResponse,
)
from app.schemas.common import ModuleName
@@ -23,6 +28,7 @@ class RequestController:
item = self._service.get(request_id)
if item is None:
raise AppError("request_not_found", f"Agent request not found: {request_id}", ModuleName.BACKEND)
route = RouteSelectionResponse(**item.route.as_payload()) if item.route is not None else None
return AgentRequestStateResponse(
request_id=item.request_id,
session_id=item.session_id,
@@ -31,6 +37,7 @@ class RequestController:
answer=item.answer,
changeset=item.changeset,
apply_changeset=item.apply_changeset,
route=route,
error=item.error,
created_at=item.created_at,
completed_at=item.completed_at,
@@ -4,6 +4,7 @@ from dataclasses import dataclass
from dataclasses import field
from datetime import datetime, timezone
from app.core.api.domain.models.request_route import RequestRoute
from app.schemas.common import ErrorPayload
from app.schemas.changeset import ChangeItem
from app.schemas.orchestration import RequestExecutionStatus
@@ -21,6 +22,7 @@ class AgentRequest:
answer: str | None = None
changeset: list[ChangeItem] = field(default_factory=list)
apply_changeset: bool = False
route: RequestRoute | None = None
error: ErrorPayload | None = None
@classmethod
@@ -39,3 +41,20 @@ class AgentRequest:
status=RequestExecutionStatus.QUEUED,
created_at=datetime.now(timezone.utc),
)
def set_route(
self,
*,
routing_domain: str,
intent: str,
subintent: str,
subintent_label: str,
subintent_comment: str,
) -> None:
self.route = RequestRoute(
routing_domain=routing_domain,
intent=intent,
subintent=subintent,
subintent_label=subintent_label,
subintent_comment=subintent_comment,
)
@@ -0,0 +1,21 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class RequestRoute:
routing_domain: str
intent: str
subintent: str
subintent_label: str
subintent_comment: str
def as_payload(self) -> dict[str, str]:
return {
"routing_domain": self.routing_domain,
"intent": self.intent,
"subintent": self.subintent,
"subintent_label": self.subintent_label,
"subintent_comment": self.subintent_comment,
}
+10 -1
View File
@@ -13,6 +13,7 @@ from app.core.agent.utils.llm import AgentLlmService, PromptLoader
from app.core.api.module import ApiModule
from app.core.api.application.session_bootstrap_service import SessionBootstrapService
from app.core.api.application.request_service import RequestService
from app.core.api.application.request_start_gate import RequestStartGate
from app.core.api.application.session_service import SessionService
from app.core.api.application.stream_service import StreamService
from app.core.api.infrastructure.ids.request_id_factory import RequestIdFactory
@@ -71,6 +72,8 @@ class ModularApplication:
/ "agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/create_doc/steps/step2_generate_sections/prompts/prompts.yml",
Path(__file__).resolve().parent
/ "agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/edit_doc/steps/step2_generate_sections/prompts/prompts.yml",
Path(__file__).resolve().parent
/ "agent/processes/v2/workflows/doc_generate_openapi/prompts/prompts.yml",
Path(__file__).resolve().parent / "agent/processes/v2/intent_router/routers/prompts.yml",
]
)
@@ -88,6 +91,7 @@ class ModularApplication:
self.agent_requests = InMemoryRequestStore()
self.agent_events = SseEventChannel()
self.agent_trace_logger = RequestTraceLogger(Path("runtime_traces/agent_requests"))
self.agent_request_start_gate = RequestStartGate()
_publisher = RuntimeEventPublisher(self.agent_events, self.agent_trace_logger)
_session_service = SessionService(
store=self.agent_sessions,
@@ -122,11 +126,16 @@ class ModularApplication:
request_ids=RequestIdFactory(),
sessions=_session_service,
runtime=_runtime,
start_gate=self.agent_request_start_gate,
)
self.api = ApiModule(
session_bootstrap=_session_bootstrap,
requests=_request_service,
streams=StreamService(self.agent_events, request_exists=lambda request_id: self.agent_requests.get(request_id) is not None),
streams=StreamService(
self.agent_events,
request_exists=lambda request_id: self.agent_requests.get(request_id) is not None,
start_gate=self.agent_request_start_gate,
),
rag=self.rag,
)
+27 -3
View File
@@ -18,20 +18,40 @@ class IndexJob:
index_job_id: str
rag_session_id: str
status: IndexJobStatus = IndexJobStatus.QUEUED
total_files: int = 0
indexed_files: int = 0
failed_files: int = 0
skipped_files: int = 0
cache_hit_files: int = 0
cache_miss_files: int = 0
error: ErrorPayload | None = None
@property
def reuse_percent(self) -> int:
total = self.cache_hit_files + self.cache_miss_files
if total <= 0:
return 0
return round((self.cache_hit_files / total) * 100)
class IndexJobStore:
def __init__(self, repository: RagRepository) -> None:
self._repo = repository
def create(self, rag_session_id: str) -> IndexJob:
job = IndexJob(index_job_id=str(uuid4()), rag_session_id=rag_session_id)
self._repo.create_job(job.index_job_id, rag_session_id, job.status.value)
def create(self, rag_session_id: str, *, total_files: int = 0, skipped_files: int = 0) -> IndexJob:
job = IndexJob(
index_job_id=str(uuid4()),
rag_session_id=rag_session_id,
total_files=total_files,
skipped_files=skipped_files,
)
self._repo.create_job(
job.index_job_id,
rag_session_id,
job.status.value,
total_files=job.total_files,
skipped_files=job.skipped_files,
)
return job
def get(self, index_job_id: str) -> IndexJob | None:
@@ -55,8 +75,10 @@ class IndexJobStore:
index_job_id=row.index_job_id,
rag_session_id=row.rag_session_id,
status=IndexJobStatus(row.status),
total_files=row.total_files,
indexed_files=row.indexed_files,
failed_files=row.failed_files,
skipped_files=row.skipped_files,
cache_hit_files=row.cache_hit_files,
cache_miss_files=row.cache_miss_files,
error=payload,
@@ -88,8 +110,10 @@ class IndexJobStore:
self._repo.update_job(
job.index_job_id,
status=job.status.value,
total_files=job.total_files,
indexed_files=job.indexed_files,
failed_files=job.failed_files,
skipped_files=job.skipped_files,
cache_hit_files=job.cache_hit_files,
cache_miss_files=job.cache_miss_files,
error_code=error_code,
+130 -23
View File
@@ -35,33 +35,75 @@ class IndexingOrchestrator:
self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def enqueue_snapshot(self, rag_session_id: str, files: list[dict]) -> IndexJob:
job = self._store.create(rag_session_id)
filtered_files = filter_snapshot_files(files)
total_files = len(files)
skipped_files = max(0, total_files - len(filtered_files))
job = self._store.create(
rag_session_id,
total_files=total_files,
skipped_files=skipped_files,
)
LOGGER.warning(
"rag index snapshot queued: job_id=%s rag_session_id=%s files=%s",
"rag index snapshot queued: job_id=%s rag_session_id=%s total=%s skipped=%s",
job.index_job_id,
rag_session_id,
len(files),
total_files,
skipped_files,
)
asyncio.create_task(
self._process_snapshot(
job.index_job_id,
rag_session_id,
filtered_files,
total_files=total_files,
skipped_files=skipped_files,
)
)
asyncio.create_task(self._process_snapshot(job.index_job_id, rag_session_id, files))
return job
async def enqueue_changes(self, rag_session_id: str, changed_files: list[dict]) -> IndexJob:
job = self._store.create(rag_session_id)
filtered_changes = filter_changes_for_indexing(changed_files)
total_files = len(changed_files)
indexable_total = count_indexable_change_upserts(filtered_changes)
skipped_files = max(0, total_files - indexable_total)
job = self._store.create(
rag_session_id,
total_files=total_files,
skipped_files=skipped_files,
)
LOGGER.warning(
"rag index changes queued: job_id=%s rag_session_id=%s changes=%s",
"rag index changes queued: job_id=%s rag_session_id=%s total=%s skipped=%s",
job.index_job_id,
rag_session_id,
len(changed_files),
total_files,
skipped_files,
)
asyncio.create_task(
self._process_changes(
job.index_job_id,
rag_session_id,
filtered_changes,
total_files=total_files,
skipped_files=skipped_files,
)
)
asyncio.create_task(self._process_changes(job.index_job_id, rag_session_id, changed_files))
return job
async def _process_snapshot(self, job_id: str, rag_session_id: str, files: list[dict]) -> None:
filtered_files = filter_snapshot_files(files)
async def _process_snapshot(
self,
job_id: str,
rag_session_id: str,
filtered_files: list[dict],
*,
total_files: int,
skipped_files: int,
) -> None:
await self._run_with_project_lock(
job_id=job_id,
rag_session_id=rag_session_id,
total_files=len(filtered_files),
total_files=total_files,
skipped_files=skipped_files,
progress_total=len(filtered_files),
operation=lambda progress_cb: self._rag.index_snapshot(
rag_session_id=rag_session_id,
files=filtered_files,
@@ -69,12 +111,21 @@ class IndexingOrchestrator:
),
)
async def _process_changes(self, job_id: str, rag_session_id: str, changed_files: list[dict]) -> None:
filtered_changes = filter_changes_for_indexing(changed_files)
async def _process_changes(
self,
job_id: str,
rag_session_id: str,
filtered_changes: list[dict],
*,
total_files: int,
skipped_files: int,
) -> None:
await self._run_with_project_lock(
job_id=job_id,
rag_session_id=rag_session_id,
total_files=count_indexable_change_upserts(filtered_changes),
total_files=total_files,
skipped_files=skipped_files,
progress_total=count_indexable_change_upserts(filtered_changes),
operation=lambda progress_cb: self._rag.index_changes(
rag_session_id=rag_session_id,
changed_files=filtered_changes,
@@ -82,7 +133,15 @@ class IndexingOrchestrator:
),
)
async def _run_with_project_lock(self, job_id: str, rag_session_id: str, total_files: int, operation) -> None:
async def _run_with_project_lock(
self,
job_id: str,
rag_session_id: str,
total_files: int,
skipped_files: int,
progress_total: int,
operation,
) -> None:
lock = self._locks[rag_session_id]
async with lock:
job = self._store.get(job_id)
@@ -90,17 +149,26 @@ class IndexingOrchestrator:
LOGGER.warning("rag index job missing in store before start: job_id=%s", job_id)
return
job.status = IndexJobStatus.RUNNING
job.total_files = total_files
job.skipped_files = skipped_files
self._store.save(job)
LOGGER.warning(
"rag index job running: job_id=%s rag_session_id=%s total_files=%s",
"rag index job running: job_id=%s rag_session_id=%s total_files=%s skipped_files=%s",
job_id,
rag_session_id,
total_files,
skipped_files,
)
await self._events.publish(
job_id,
"index_status",
{"index_job_id": job_id, "status": job.status.value, "total_files": total_files},
{
"index_job_id": job_id,
"status": job.status.value,
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
},
)
try:
async def progress_cb(current_file_index: int, total: int, current_file_name: str) -> None:
@@ -110,8 +178,11 @@ class IndexingOrchestrator:
{
"index_job_id": job_id,
"current_file_index": current_file_index,
"total_files": total,
"total_files": total_files,
"indexable_files": total,
"processed_files": current_file_index,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"current_file_path": current_file_name,
"current_file_name": current_file_name,
},
@@ -123,8 +194,10 @@ class IndexingOrchestrator:
timeout=timeout_sec,
)
job.status = IndexJobStatus.DONE
job.total_files = total_files
job.indexed_files = indexed
job.failed_files = failed
job.skipped_files = skipped_files
job.cache_hit_files = cache_hits
job.cache_miss_files = cache_misses
self._store.save(job)
@@ -142,11 +215,13 @@ class IndexingOrchestrator:
{
"index_job_id": job_id,
"status": job.status.value,
"total_files": total_files,
"indexed_files": indexed,
"failed_files": failed,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"cache_hit_files": cache_hits,
"cache_miss_files": cache_misses,
"total_files": total_files,
},
)
await self._events.publish(
@@ -155,15 +230,19 @@ class IndexingOrchestrator:
{
"index_job_id": job_id,
"status": "done",
"total_files": total_files,
"indexed_files": indexed,
"failed_files": failed,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"cache_hit_files": cache_hits,
"cache_miss_files": cache_misses,
"total_files": total_files,
},
)
except (TimeoutError, ConnectionError, OSError) as exc:
job.status = IndexJobStatus.ERROR
job.total_files = total_files
job.skipped_files = skipped_files
job.failed_files = max(1, job.failed_files)
job.error = ErrorPayload(
code="index_runtime_error",
@@ -175,7 +254,13 @@ class IndexingOrchestrator:
await self._events.publish(
job_id,
"index_status",
{"index_job_id": job_id, "status": job.status.value, "total_files": total_files},
{
"index_job_id": job_id,
"status": job.status.value,
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
},
)
await self._events.publish(
job_id,
@@ -184,6 +269,8 @@ class IndexingOrchestrator:
"index_job_id": job_id,
"status": "error",
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"error": {
"code": job.error.code,
"desc": job.error.desc,
@@ -193,6 +280,8 @@ class IndexingOrchestrator:
)
except asyncio.TimeoutError as exc:
job.status = IndexJobStatus.ERROR
job.total_files = total_files
job.skipped_files = skipped_files
job.failed_files = max(1, job.failed_files)
job.error = ErrorPayload(
code="index_timeout",
@@ -204,7 +293,13 @@ class IndexingOrchestrator:
await self._events.publish(
job_id,
"index_status",
{"index_job_id": job_id, "status": job.status.value, "total_files": total_files},
{
"index_job_id": job_id,
"status": job.status.value,
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
},
)
await self._events.publish(
job_id,
@@ -213,6 +308,8 @@ class IndexingOrchestrator:
"index_job_id": job_id,
"status": "error",
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"error": {
"code": job.error.code,
"desc": job.error.desc,
@@ -222,6 +319,8 @@ class IndexingOrchestrator:
)
except Exception as exc:
job.status = IndexJobStatus.ERROR
job.total_files = total_files
job.skipped_files = skipped_files
job.failed_files = max(1, job.failed_files)
job.error = ErrorPayload(
code="index_unexpected_error",
@@ -233,7 +332,13 @@ class IndexingOrchestrator:
await self._events.publish(
job_id,
"index_status",
{"index_job_id": job_id, "status": job.status.value, "total_files": total_files},
{
"index_job_id": job_id,
"status": job.status.value,
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
},
)
await self._events.publish(
job_id,
@@ -242,6 +347,8 @@ class IndexingOrchestrator:
"index_job_id": job_id,
"status": "error",
"total_files": total_files,
"skipped_files": skipped_files,
"reuse_percent": job.reuse_percent,
"error": {
"code": job.error.code,
"desc": job.error.desc,
+34 -5
View File
@@ -13,8 +13,10 @@ class RagJobRow:
index_job_id: str
rag_session_id: str
status: str
total_files: int
indexed_files: int
failed_files: int
skipped_files: int
cache_hit_files: int
cache_miss_files: int
error_code: str | None
@@ -24,16 +26,36 @@ class RagJobRow:
class RagJobRepository:
def create_job(self, index_job_id: str, rag_session_id: str, status: str) -> None:
def create_job(
self,
index_job_id: str,
rag_session_id: str,
status: str,
*,
total_files: int = 0,
skipped_files: int = 0,
) -> None:
with get_engine().connect() as conn:
conn.execute(
text(
"""
INSERT INTO rag_index_jobs (index_job_id, rag_session_id, status)
VALUES (:jid, :sid, :status)
INSERT INTO rag_index_jobs (
index_job_id,
rag_session_id,
status,
total_files,
skipped_files
)
VALUES (:jid, :sid, :status, :total_files, :skipped_files)
"""
),
{"jid": index_job_id, "sid": rag_session_id, "status": status},
{
"jid": index_job_id,
"sid": rag_session_id,
"status": status,
"total_files": total_files,
"skipped_files": skipped_files,
},
)
conn.commit()
@@ -42,8 +64,10 @@ class RagJobRepository:
index_job_id: str,
*,
status: str,
total_files: int,
indexed_files: int,
failed_files: int,
skipped_files: int,
cache_hit_files: int = 0,
cache_miss_files: int = 0,
error_code: str | None = None,
@@ -56,8 +80,10 @@ class RagJobRepository:
"""
UPDATE rag_index_jobs
SET status = :status,
total_files = :total_files,
indexed_files = :indexed,
failed_files = :failed,
skipped_files = :skipped_files,
cache_hit_files = :cache_hit_files,
cache_miss_files = :cache_miss_files,
error_code = :ecode,
@@ -70,8 +96,10 @@ class RagJobRepository:
{
"jid": index_job_id,
"status": status,
"total_files": total_files,
"indexed": indexed_files,
"failed": failed_files,
"skipped_files": skipped_files,
"cache_hit_files": cache_hit_files,
"cache_miss_files": cache_miss_files,
"ecode": error_code,
@@ -86,7 +114,8 @@ class RagJobRepository:
row = conn.execute(
text(
"""
SELECT index_job_id, rag_session_id, status, indexed_files, failed_files,
SELECT index_job_id, rag_session_id, status, total_files, indexed_files, failed_files,
skipped_files,
cache_hit_files, cache_miss_files, error_code, error_desc, error_module, updated_at
FROM rag_index_jobs
WHERE index_job_id = :jid
+2 -2
View File
@@ -31,8 +31,8 @@ class RagRepository:
def get_session(self, rag_session_id: str) -> dict | None:
return self._sessions.get_session(rag_session_id)
def create_job(self, index_job_id: str, rag_session_id: str, status: str) -> None:
self._jobs.create_job(index_job_id, rag_session_id, status)
def create_job(self, index_job_id: str, rag_session_id: str, status: str, **kwargs) -> None:
self._jobs.create_job(index_job_id, rag_session_id, status, **kwargs)
def update_job(self, index_job_id: str, **kwargs) -> None:
self._jobs.update_job(index_job_id, **kwargs)
@@ -15,6 +15,8 @@ class RagSchemaMigrator:
def _ensure_core_columns(self, conn) -> None:
for statement in (
"ALTER TABLE rag_index_jobs ADD COLUMN IF NOT EXISTS total_files INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE rag_index_jobs ADD COLUMN IF NOT EXISTS skipped_files INTEGER NOT NULL DEFAULT 0",
"ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS layer VARCHAR(64) NULL",
"ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS lang VARCHAR(32) NULL",
"ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS repo_id VARCHAR(512) NULL",
@@ -16,8 +16,10 @@ def base_table_statements() -> tuple[str, ...]:
index_job_id VARCHAR(64) PRIMARY KEY,
rag_session_id VARCHAR(64) NOT NULL,
status VARCHAR(16) NOT NULL,
total_files INTEGER NOT NULL DEFAULT 0,
indexed_files INTEGER NOT NULL DEFAULT 0,
failed_files INTEGER NOT NULL DEFAULT 0,
skipped_files INTEGER NOT NULL DEFAULT 0,
cache_hit_files INTEGER NOT NULL DEFAULT 0,
cache_miss_files INTEGER NOT NULL DEFAULT 0,
error_code VARCHAR(128) NULL,
+4 -1
View File
@@ -116,7 +116,10 @@ class GigaChatClient:
else:
if response.status_code < 400:
return response
last_error = GigaChatError(f"GigaChat {operation_name} error {response.status_code}: {response.text}")
last_error = GigaChatError(
f"GigaChat {operation_name} error {response.status_code}: {response.text}",
status_code=response.status_code,
)
if not self._is_retryable_status(response.status_code):
raise last_error
if attempt == retries:
+3 -1
View File
@@ -1,2 +1,4 @@
class GigaChatError(OSError):
pass
def __init__(self, message: str, *, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
+1 -1
View File
@@ -20,6 +20,6 @@ class GigaChatSettings:
scope=os.getenv("GIGACHAT_SCOPE", "GIGACHAT_API_PERS"),
credentials=os.getenv("GIGACHAT_TOKEN", "").strip(),
ssl_verify=os.getenv("GIGACHAT_SSL_VERIFY", "true").lower() in {"1", "true", "yes"},
model=os.getenv("GIGACHAT_MODEL", "GigaChat-Pro"),
model=os.getenv("GIGACHAT_MODEL", "GigaChat-2"),
embedding_model=os.getenv("GIGACHAT_EMBEDDING_MODEL", "Embeddings"),
)
@@ -61,7 +61,10 @@ class GigaChatTokenProvider:
raise GigaChatError(f"GigaChat auth request failed: {exc}") from exc
if response.status_code >= 400:
raise GigaChatError(f"GigaChat auth error {response.status_code}: {response.text}")
raise GigaChatError(
f"GigaChat auth error {response.status_code}: {response.text}",
status_code=response.status_code,
)
payload = response.json()
token = payload.get("access_token")

Some files were not shown because too many files have changed in this diff Show More