From 4e3435ad923ff02bd2f29bbd6ff1d4a1ec18774b Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Fri, 10 Apr 2026 10:29:17 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=D0=B5?= =?UTF-8?q?=D1=82=20=D0=B0=D0=B3=D0=B5=D0=BD=D1=82,=20=D0=BF=D0=BE=D0=BF?= =?UTF-8?q?=D1=80=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D1=8B=20=D0=BF=D1=83=D1=82?= =?UTF-8?q?=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _process/03. RAG.md | 23 +--- docs/inp.md | 0 .../doc_rules/templates/ui_page.template.md | 50 ++++++++ .../steps/retrieval/api_endpoint_collector.py | 25 ---- .../doc_rules_pipeline/changeset_generator.py | 67 ++++++++++- .../steps/build_change_plan_step.py | 110 +++++++----------- .../steps/feature_markdown_parser.py | 13 ++- .../steps/parse_feature_requirements_step.py | 6 +- .../steps/plan_hints.py | 64 ++++++++++ .../steps/plan_path_policy.py | 59 ++++++++++ .../steps/prompts/prompts.yml | 51 +++++++- .../workflow_runtime/buffered_graph.py | 2 + .../workflow_runtime/models.py | 2 + src/app/core/agent/runtime/agent_runtime.py | 45 ++++++- src/app/core/agent/runtime/publisher.py | 18 +++ .../core/api/application/request_service.py | 25 +++- .../api/controllers/rag_public_controller.py | 13 +++ .../core/rag/embedding/gigachat_embedder.py | 10 +- .../rag/indexing/docs/document_builder.py | 48 ++++---- .../rag/indexing/docs/frontmatter_metadata.py | 13 +++ src/app/core/rag/indexing/job_store.py | 26 ++++- src/app/core/rag/indexing/orchestrator.py | 101 +++++++++++++++- src/app/core/rag/indexing/service.py | 87 +++++++++++++- .../core/rag/persistence/job_repository.py | 4 +- src/app/core/shared/gigachat/client.py | 72 +++++++++--- .../core/shared/gigachat/token_provider.py | 25 +++- src/app/core/shared/network/hard_timeout.py | 31 +++++ src/app/main.py | 18 +++ .../agent/test_api_endpoint_collector.py | 4 +- .../rag/test_docs_indexing_pipeline.py | 9 +- 30 files changed, 837 insertions(+), 184 deletions(-) create mode 100644 docs/inp.md create mode 100644 src/app/core/agent/processes/v2/doc_rules/templates/ui_page.template.md create mode 100644 src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_hints.py create mode 100644 src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_path_policy.py create mode 100644 src/app/core/rag/indexing/docs/frontmatter_metadata.py create mode 100644 src/app/core/shared/network/hard_timeout.py diff --git a/_process/03. RAG.md b/_process/03. RAG.md index be00cc4..6fb5b1e 100644 --- a/_process/03. RAG.md +++ b/_process/03. RAG.md @@ -89,28 +89,17 @@ RAG сейчас используется как общее ядро индек Хранит карточку документа как точку входа в документ и его краткое описание. Формирование: -Источник данных - frontmatter, fallback title, summary и doc kind, вычисленный классификатором документации. -Данные извлекаются структурированно по атрибутам. +Источник данных - frontmatter `as is`, summary и doc kind, вычисленный классификатором документации. +В `metadata_json` копируются все `key-value` из frontmatter без нормализации и без fallback для frontmatter-атрибутов. +Дополнительно в `metadata_json` добавляются служебные поля `source_path`, `summary_text`, `doc_kind`. +Атрибут `document_id` добавляется только при наличии `frontmatter.id` (fallback до пути файла не применяется). В `content` попадает summary документа, а не склейка всех частей документа в сплошной текст. Фиксация в БД: | Атрибут в `metadata_json` | Описание | Источник | |---|---|---| -| `document_id` | идентификатор документа | `frontmatter.id`, иначе путь файла | -| `type` | тип документа из frontmatter | `frontmatter.type` | -| `name` | системное имя документа | `frontmatter.name` | -| `title` | человекочитаемый заголовок документа | `frontmatter.title`, иначе fallback title | -| `module` | модуль документа | `frontmatter.module` | -| `domain` | домен документа | `frontmatter.domain` | -| `subdomain` | поддомен документа | `frontmatter.subdomain` | -| `layer` | логический слой, указанный в frontmatter документа | `frontmatter.layer` | -| `status` | статус документа | `frontmatter.status` | -| `updated_at` | дата или отметка последнего обновления | `frontmatter.updated_at` | -| `tags` | теги документа | `frontmatter.tags` | -| `entities` | сущности, связанные с документом | `frontmatter.entities` | -| `parent` | родительский документ | `frontmatter.parent` | -| `children` | дочерние документы | `frontmatter.children` | -| `links` | ссылки на связанные материалы | `frontmatter.links` | +| `*` frontmatter fields | все поля frontmatter в исходном виде | frontmatter документа | +| `document_id` | идентификатор документа, добавляется только если в frontmatter есть `id` | `frontmatter.id` | | `source_path` | исходный путь документа | путь файла | | `summary_text` | краткое содержание документа | секция `# Summary` | | `doc_kind` | классификация документа, например `readme`, `spec`, `runbook` | `DocsClassifier.classify(path)` | diff --git a/docs/inp.md b/docs/inp.md new file mode 100644 index 0000000..e69de29 diff --git a/src/app/core/agent/processes/v2/doc_rules/templates/ui_page.template.md b/src/app/core/agent/processes/v2/doc_rules/templates/ui_page.template.md new file mode 100644 index 0000000..5bd32fe --- /dev/null +++ b/src/app/core/agent/processes/v2/doc_rules/templates/ui_page.template.md @@ -0,0 +1,50 @@ +--- +id: ui.example_page +type: ui_page +doc_type: ui_page +name: example_page +title: Пример UI-страницы +module: example_module +layer: presentation +domain: example_domain +sub_domain: example_subdomain +related_docs: [] +status: draft +updated_at: 2026-03-20 +source_of_truth: mixed +parent: null +children: [] +tags: [] +entities: [] +links: {} +--- + +# Пример UI-страницы + +## Summary + +Краткое описание страницы и её назначения. + +## Details + +### Назначение страницы + +### Пользовательский сценарий + +### Основные блоки интерфейса + +### Связанные API и сущности + +### Функциональные требования + +### Нефункциональные требования + +### Ограничения и граничные случаи + +### Ошибки и валидации + +### Связанный код + +### Связанные документы + +### История изменений diff --git a/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/steps/retrieval/api_endpoint_collector.py b/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/steps/retrieval/api_endpoint_collector.py index 4f55a6c..42189c9 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/steps/retrieval/api_endpoint_collector.py +++ b/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/steps/retrieval/api_endpoint_collector.py @@ -8,26 +8,14 @@ class ApiEndpointCollector: _ENDPOINT_VALUE_RE = re.compile( r"\b((?:GET|POST|PUT|PATCH|DELETE|HEAD|OPTIONS)(?:\s*\|\s*(?:GET|POST|PUT|PATCH|DELETE|HEAD|OPTIONS))*)\s+(/[-a-zA-Z0-9_./{}]+)" ) - _METHOD_PATH_RE = re.compile(r"\b(GET|POST|PUT|PATCH|DELETE|HEAD|OPTIONS)\s+(/[-a-zA-Z0-9_./{}]+)") - _PATH_RE = re.compile(r"(/[-a-zA-Z0-9_./{}]+)") _DOC_EXTS = (".md", ".yaml", ".yml", ".json") def collect(self, rows: list[dict]) -> list[str]: endpoints: list[str] = [] for row in rows: self._append_from_endpoint_metadata(endpoints, row) - self._append_from_title_fallback(endpoints, row) - for raw in self._row_candidates(row): - self._append_from_text(endpoints, raw) return sorted(set(endpoints)) - def _append_from_title_fallback(self, out: list[str], row: dict) -> None: - title = str(row.get("title") or "").strip() - if not title: - return - for match in self._PATH_RE.findall(title): - self._append_default(out, match) - def _append_from_endpoint_metadata(self, out: list[str], row: dict) -> None: metadata = dict(row.get("metadata") or {}) endpoint_value = str(metadata.get("endpoint") or "").strip() @@ -36,19 +24,6 @@ class ApiEndpointCollector: for methods, path in self._ENDPOINT_VALUE_RE.findall(endpoint_value): self._append_methods_with_path(out, methods, path) - def _row_candidates(self, row: dict) -> list[str]: - metadata = dict(row.get("metadata") or {}) - values = [ - metadata.get("name"), - metadata.get("summary_text"), - row.get("title"), - ] - return [str(value or "") for value in values if str(value or "").strip()] - - def _append_from_text(self, out: list[str], text: str) -> None: - for method, path in self._METHOD_PATH_RE.findall(text): - self._append_with_method(out, method, path) - def _append_methods_with_path(self, out: list[str], methods_raw: str, path_raw: str) -> None: methods = [ part.strip().upper() diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/doc_rules_pipeline/changeset_generator.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/doc_rules_pipeline/changeset_generator.py index b392835..107a6bb 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/doc_rules_pipeline/changeset_generator.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/doc_rules_pipeline/changeset_generator.py @@ -56,7 +56,17 @@ class DocRulesChangesetGenerator: ) payload = self._parse_json(raw) if payload is None: - return None, f"LLM вернул невалидный JSON changeset для {item.path}." + if trace is not None: + trace.log("changeset_json_parse_failed", {"path": item.path, "raw_chars": len(str(raw or ""))}) + repaired_raw = self._llm.generate( + "v2_docs_update.repair_doc_changeset_json", + self._build_repair_input(raw=raw, item=item), + log_context="workflow.v2.docs_update.from_feature.changeset_repair", + trace=trace, + ) + payload = self._parse_json(repaired_raw) + if payload is None: + return None, f"LLM вернул невалидный JSON changeset для {item.path} даже после repair." payload["op"] = item.op payload["path"] = item.path payload["reason"] = str(payload.get("reason") or item.reason)[:500] @@ -80,8 +90,63 @@ class DocRulesChangesetGenerator: value = json.loads(text) return value if isinstance(value, dict) else None except json.JSONDecodeError: + normalized = self._escape_control_chars_in_json_strings(text) + if normalized != text: + try: + value = json.loads(normalized) + return value if isinstance(value, dict) else None + except json.JSONDecodeError: + return None return None + def _build_repair_input(self, *, raw: str, item: PlannedChange) -> str: + payload = { + "expected_contract": { + "op": item.op, + "path": item.path, + "required_keys": ["op", "path", "reason"], + "proposed_content_required_for": ["create", "update"], + }, + "raw_llm_output": str(raw or ""), + } + return json.dumps(payload, ensure_ascii=False, indent=2) + + def _escape_control_chars_in_json_strings(self, text: str) -> str: + escaped: list[str] = [] + in_string = False + backslash = False + for char in text: + if not in_string: + escaped.append(char) + if char == '"': + in_string = True + continue + if backslash: + escaped.append(char) + backslash = False + continue + if char == "\\": + escaped.append(char) + backslash = True + continue + if char == '"': + escaped.append(char) + in_string = False + continue + codepoint = ord(char) + if codepoint < 0x20: + if char == "\n": + escaped.append("\\n") + elif char == "\r": + escaped.append("\\r") + elif char == "\t": + escaped.append("\\t") + else: + escaped.append(f"\\u{codepoint:04x}") + continue + escaped.append(char) + return "".join(escaped) + def _resolve_base_hash(self, project_root: str, rel_path: str) -> str: root = Path(project_root or "").expanduser() if not root.is_absolute(): diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/build_change_plan_step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/build_change_plan_step.py index f44220e..c91d91a 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/build_change_plan_step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/build_change_plan_step.py @@ -1,9 +1,13 @@ from __future__ import annotations import json -import re from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.docs_state_loader import DocsState +from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.plan_hints import ( + PlanUnitHint, + parse_plan_hints, +) +from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.plan_path_policy import PlanPathPolicy from app.core.agent.processes.v2.workflows.doc_update_from_feature.workflow_runtime.context import DocUpdateFromFeatureContext from app.core.agent.processes.v2.workflows.doc_update_from_feature.workflow_runtime.models import PlannedChange from app.core.agent.processes.v2.workflows.doc_update_from_feature.workflow_runtime.system_rules import ( @@ -23,15 +27,16 @@ class BuildChangePlanStep(WorkflowStep[DocUpdateFromFeatureContext]): def __init__(self, llm: AgentLlmService, query_repository: RagQueryRepository | None = None) -> None: self._llm = llm self._query_repository = query_repository or RagQueryRepository() + self._path_policy = PlanPathPolicy(DOC_TYPE_TO_FOLDER) async def run(self, context: DocUpdateFromFeatureContext) -> DocUpdateFromFeatureContext: if context.answer or not context.units: return context self._load_docs_state(context) - inferred_types = self._infer_missing_types(context) + inferred_hints = self._infer_plan_hints(context) state = DocsState.from_rows(context.docs_catalog_rows) for index, unit in enumerate(context.units): - planned = self._build_unit_plan(context, unit, state, inferred_types.get(index, "")) + planned = self._build_unit_plan(context, unit, state, inferred_hints.get(index, PlanUnitHint())) if planned is None: continue context.planned_changes.append(planned) @@ -55,18 +60,26 @@ class BuildChangePlanStep(WorkflowStep[DocUpdateFromFeatureContext]): except Exception as exc: context.issues.append(f"Не удалось загрузить состояние документации из RAG: {exc}") - def _infer_missing_types(self, context: DocUpdateFromFeatureContext) -> dict[int, str]: - missing: list[tuple[int, str, str]] = [] + def _infer_plan_hints(self, context: DocUpdateFromFeatureContext) -> dict[int, PlanUnitHint]: + items = [] for idx, unit in enumerate(context.units): - value = str(unit.metadata.get("type") or "").strip() - if not value: - missing.append((idx, unit.heading, unit.body[:400])) - if not missing: - return {} + items.append( + { + "index": idx, + "heading": unit.heading, + "snippet": unit.body[:400], + "known": { + "type": str(unit.metadata.get("type") or "").strip(), + "id": str(unit.metadata.get("id") or "").strip(), + "application": str(unit.metadata.get("application") or context.analytics_meta.application or "").strip(), + "platform": str(unit.metadata.get("platform") or context.analytics_meta.platform or "").strip(), + }, + } + ) payload = { "system_rules": SYSTEM_RULES_TEXT, "allowed_doc_types": list(ALLOWED_DOC_TYPES), - "items": [{"index": idx, "heading": h, "snippet": snippet} for idx, h, snippet in missing], + "items": items, } raw = self._llm.generate( "v2_docs_update.plan_change_units", @@ -74,43 +87,38 @@ class BuildChangePlanStep(WorkflowStep[DocUpdateFromFeatureContext]): log_context="workflow.v2.docs_update.from_feature.plan", trace=context.runtime.trace.module("workflow.v2.docs_update.from_feature.llm"), ) - return self._parse_type_inference(raw) - - def _parse_type_inference(self, raw: str) -> dict[int, str]: - try: - data = json.loads(str(raw or "").strip()) - except json.JSONDecodeError: - return {} - rows = data.get("items") if isinstance(data, dict) else [] - if not isinstance(rows, list): - return {} - result: dict[int, str] = {} - for row in rows: - if not isinstance(row, dict): - continue - index = row.get("index") - doc_type = str(row.get("doc_type") or "").strip() - if not isinstance(index, int) or doc_type not in ALLOWED_DOC_TYPES: - continue - result[index] = doc_type - return result + return parse_plan_hints(raw, ALLOWED_DOC_TYPES) def _build_unit_plan( self, context: DocUpdateFromFeatureContext, unit, state: DocsState, - inferred_doc_type: str, + hint: PlanUnitHint, ) -> PlannedChange | None: - doc_type = str(unit.metadata.get("type") or inferred_doc_type).strip() + doc_type = str(unit.metadata.get("type") or hint.doc_type).strip() if doc_type not in ALLOWED_DOC_TYPES: context.issues.append(f"Unit '{unit.heading}': неизвестный или отсутствующий type '{doc_type}'.") return None - unit_id = str(unit.metadata.get("id") or self._make_doc_id(doc_type, unit.heading)).strip() + unit_id = self._path_policy.make_doc_id( + doc_type=doc_type, + heading=unit.heading, + hinted_doc_id=str(unit.metadata.get("id") or hint.doc_id or "").strip(), + ) op_hint = str(unit.metadata.get("op") or "create_or_update").strip().lower() - target_hint = str(unit.metadata.get("target_path_hint") or "").strip() - path = self._resolve_path(doc_type, unit_id, unit.heading, target_hint, state) - op = self._resolve_op(op_hint, unit_id, path, state) + application = str(unit.metadata.get("application") or context.analytics_meta.application or hint.application).strip() + platform = str(unit.metadata.get("platform") or context.analytics_meta.platform or hint.platform).strip().lower() + page_type = str(unit.metadata.get("page_type") or hint.page_type or self._path_policy.default_page_type(doc_type)).strip() + path = self._path_policy.resolve_path( + doc_type=doc_type, + unit_id=unit_id, + application=application, + platform=platform, + page_type=page_type, + inferred_path=hint.path, + state=state, + ) + op = self._path_policy.resolve_op(op_hint=op_hint, unit_id=unit_id, path=path, state=state) source_refs = self._as_list(unit.metadata.get("source_refs")) or ["section: 5. Функциональные требования"] related_docs = self._as_list(unit.metadata.get("related_docs")) reason = f"Из unit '{unit.heading}' системной аналитики ({context.analytics_meta.analysis_id or 'analysis'})." @@ -126,34 +134,6 @@ class BuildChangePlanStep(WorkflowStep[DocUpdateFromFeatureContext]): related_docs=related_docs, ) - def _resolve_path(self, doc_type: str, unit_id: str, heading: str, hint: str, state: DocsState) -> str: - if unit_id in state.by_doc_id: - return state.by_doc_id[unit_id] - if hint: - return hint - folder = DOC_TYPE_TO_FOLDER.get(doc_type, "docs") - slug = self._slugify(unit_id or heading) - return f"{folder}/{slug}.md" - - def _resolve_op(self, op_hint: str, unit_id: str, path: str, state: DocsState) -> str: - if op_hint == "delete": - return "delete" - if op_hint == "create": - return "create" - if op_hint == "update": - return "update" - if path in state.by_path or unit_id in state.by_doc_id: - return "update" - return "create" - - def _make_doc_id(self, doc_type: str, heading: str) -> str: - slug = self._slugify(heading).replace("-", "_") - return f"{doc_type}.{slug}".strip(".") - - def _slugify(self, value: str) -> str: - cleaned = re.sub(r"[^a-zA-Z0-9а-яА-Я_-]+", "-", value.lower()).strip("-") - return re.sub(r"-+", "-", cleaned) or "doc" - def _as_list(self, value: object) -> list[str]: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/feature_markdown_parser.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/feature_markdown_parser.py index aebadfd..597d1b3 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/feature_markdown_parser.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/feature_markdown_parser.py @@ -15,7 +15,7 @@ class ParsedFeatureSpec: class FeatureMarkdownParser: - _META_KEYS = {"analysis_id", "domains", "subdomains"} + _META_KEYS = {"analysis_id", "application", "platform", "domain", "sub_domain", "domains", "subdomains"} def parse(self, content: str) -> ParsedFeatureSpec: lines = content.splitlines() @@ -54,8 +54,10 @@ class FeatureMarkdownParser: i = j return AnalyticsMeta( analysis_id=str(values.get("analysis_id") or "").strip(), - domains=self._as_list(values.get("domains")), - subdomains=self._as_list(values.get("subdomains")), + application=str(values.get("application") or "").strip(), + platform=str(values.get("platform") or "").strip(), + domains=self._as_list(values.get("domain") or values.get("domains")), + subdomains=self._as_list(values.get("sub_domain") or values.get("subdomains")), ) def _extract_functional_section(self, lines: list[str]) -> list[str]: @@ -109,9 +111,10 @@ class FeatureMarkdownParser: body_start = i + 1 i += 1 continue - if ":" not in stripped: + line = stripped[2:].strip() if stripped.startswith("- ") else stripped + if ":" not in line: break - key, value = [part.strip() for part in stripped.split(":", 1)] + key, value = [part.strip() for part in line.split(":", 1)] if not key.isidentifier(): break if value: diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/parse_feature_requirements_step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/parse_feature_requirements_step.py index 3634c4d..6f780bb 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/parse_feature_requirements_step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/parse_feature_requirements_step.py @@ -21,9 +21,9 @@ class ParseFeatureRequirementsStep(WorkflowStep[DocUpdateFromFeatureContext]): if not context.analytics_meta.analysis_id: context.issues.append("Отсутствует analysis_id в metadata аналитики.") if not context.analytics_meta.domains: - context.issues.append("Отсутствует domains в metadata аналитики.") + context.issues.append("Отсутствует domain в metadata аналитики.") if not context.analytics_meta.subdomains: - context.issues.append("Отсутствует subdomains в metadata аналитики.") + context.issues.append("Отсутствует sub_domain в metadata аналитики.") if not context.units: context.issues.append( "Не найдены units в разделе '## 5. Функциональные требования' с заголовками уровня '###'." @@ -33,6 +33,8 @@ class ParseFeatureRequirementsStep(WorkflowStep[DocUpdateFromFeatureContext]): def trace_output(self, context: DocUpdateFromFeatureContext) -> dict[str, object]: return { "analysis_id": context.analytics_meta.analysis_id, + "application": context.analytics_meta.application, + "platform": context.analytics_meta.platform, "domains": context.analytics_meta.domains, "subdomains": context.analytics_meta.subdomains, "units": len(context.units), diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_hints.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_hints.py new file mode 100644 index 0000000..a458620 --- /dev/null +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_hints.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import json +import re +from dataclasses import dataclass + + +@dataclass(slots=True) +class PlanUnitHint: + doc_type: str = "" + doc_id: str = "" + application: str = "" + platform: str = "" + page_type: str = "" + path: str = "" + + +def parse_plan_hints(raw: str, allowed_doc_types: tuple[str, ...]) -> dict[int, PlanUnitHint]: + try: + data = json.loads(str(raw or "").strip()) + except json.JSONDecodeError: + return {} + rows = data.get("items") if isinstance(data, dict) else [] + if not isinstance(rows, list): + return {} + result: dict[int, PlanUnitHint] = {} + for row in rows: + if not isinstance(row, dict): + continue + index = row.get("index") + if not isinstance(index, int): + continue + doc_type = str(row.get("doc_type") or "").strip() + result[index] = PlanUnitHint( + doc_type=doc_type if doc_type in allowed_doc_types else "", + doc_id=str(row.get("id") or "").strip(), + application=str(row.get("application") or "").strip(), + platform=str(row.get("platform") or "").strip().lower(), + page_type=str(row.get("page_type") or "").strip(), + path=str(row.get("path") or "").strip(), + ) + return result + + +def page_type_for_doc_type(doc_type: str, doc_type_to_folder: dict[str, str]) -> str: + if doc_type == "index_page": + return "index" + folder = doc_type_to_folder.get(doc_type, "docs") + parts = folder.split("/") + return parts[-1] if parts else "docs" + + +def normalize_inferred_path(inferred_path: str, unit_id: str) -> str: + path = str(inferred_path or "").strip() + if not path or not path.startswith("docs/"): + return "" + if not path.endswith(f"/{unit_id}.md"): + return "" + return path + + +def normalize_path_segment(value: str) -> str: + cleaned = re.sub(r"[^a-zA-Z0-9._-]+", "-", str(value or "").strip().lower()).strip("-") + return re.sub(r"-+", "-", cleaned) or "unknown" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_path_policy.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_path_policy.py new file mode 100644 index 0000000..d158942 --- /dev/null +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/plan_path_policy.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import re + +from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.docs_state_loader import DocsState +from app.core.agent.processes.v2.workflows.doc_update_from_feature.steps.plan_hints import ( + normalize_inferred_path, + normalize_path_segment, + page_type_for_doc_type, +) + +_VALID_PLATFORMS = {"web", "ufs", "pprb"} + + +class PlanPathPolicy: + def __init__(self, doc_type_to_folder: dict[str, str]) -> None: + self._doc_type_to_folder = dict(doc_type_to_folder) + + def make_doc_id(self, *, doc_type: str, heading: str, hinted_doc_id: str) -> str: + return str(hinted_doc_id or f"{doc_type}.{self._slugify(heading).replace('-', '_')}").strip(".") + + def resolve_path( + self, + *, + doc_type: str, + unit_id: str, + application: str, + platform: str, + page_type: str, + inferred_path: str, + state: DocsState, + ) -> str: + if unit_id in state.by_doc_id: + return state.by_doc_id[unit_id] + normalized_inferred = normalize_inferred_path(inferred_path, unit_id) + if normalized_inferred: + return normalized_inferred + page = normalize_path_segment(page_type or page_type_for_doc_type(doc_type, self._doc_type_to_folder)) + app = normalize_path_segment(application or "unknown_app") + plat = platform if platform in _VALID_PLATFORMS else "unknown_platform" + return f"docs/{app}/{plat}/{page}/{unit_id}.md" + + def resolve_op(self, *, op_hint: str, unit_id: str, path: str, state: DocsState) -> str: + if op_hint == "delete": + return "delete" + if op_hint == "create": + return "create" + if op_hint == "update": + return "update" + if path in state.by_path or unit_id in state.by_doc_id: + return "update" + return "create" + + def default_page_type(self, doc_type: str) -> str: + return page_type_for_doc_type(doc_type, self._doc_type_to_folder) + + def _slugify(self, value: str) -> str: + cleaned = re.sub(r"[^a-zA-Z0-9а-яА-Я_-]+", "-", value.lower()).strip("-") + return re.sub(r"-+", "-", cleaned) or "doc" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/prompts/prompts.yml b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/prompts/prompts.yml index 6e170df..964881e 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/prompts/prompts.yml +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/steps/prompts/prompts.yml @@ -7,7 +7,16 @@ prompts: Верни только JSON: { "items": [ - {"index": 0, "doc_type": "api_method", "reason": "..."} + { + "index": 0, + "doc_type": "api_method", + "id": "ufs.contacts_dgr.api.create", + "application": "coverage", + "platform": "ufs", + "page_type": "api", + "path": "docs/coverage/ufs/api/ufs.contacts_dgr.api.create.md", + "reason": "..." + } ] } @@ -15,12 +24,22 @@ prompts: - Используй только doc_type из allowed_doc_types. - Не пропускай item, даже если не уверен: выбери наиболее близкий тип. - Ориентируйся на heading и snippet. + - path — это служебное поле плана изменений, не поле frontmatter. + - id: + - брать из metadata unit, если задан; + - если id нет, сгенерировать стабильный id по смыслу unit и по аналогии с существующей документацией. + - имя файла всегда формировать строго как .md. + - для существующего документа (если это видно из контекста и индекса) путь не менять. + - для нового документа путь формировать строго как docs////.md. + - platform использовать только из допустимых значений: web, ufs, pprb. + - page_type выбирать по doc_type (например ui_page -> ui, api_method -> api, logic_block -> logic). + - последний сегмент path обязан совпадать с .md. - Никакого markdown и текста вне JSON. build_doc_changeset: | Ты формируешь один item changeset для документации на основе системной аналитики и правил doc_rules. - Верни только JSON-объект формата: + Верни только один JSON-объект (RFC8259) формата: { "op": "create|update|delete", "path": "docs/...", @@ -28,9 +47,37 @@ prompts: "proposed_content": "полный markdown документа для create/update" } + Схема и ограничения: + - Обязательные поля всегда: op, path, reason. + - Для op=create/update поле proposed_content обязательно и содержит полный markdown документа: + 1) frontmatter между --- и ---, + 2) затем body согласно doc_rules. + - Для op=delete поле proposed_content запрещено. + - В JSON используй двойные кавычки, без trailing commas. + - Никаких code fences (```), комментариев и текста до/после JSON. + Правила: - Строго соблюдай структуру и ограничения из doc_rules_context. - Для create/update верни полный итоговый markdown (frontmatter + body). - Для update не используй placeholder-тексты; возвращай пригодный к сохранению документ. - reason обязателен, короткий, по сути изменения. - Никакого markdown и текста вне JSON. + + repair_doc_changeset_json: | + Ты ремонтируешь невалидный ответ модели и должен вернуть строго валидный JSON changeset. + + Вход содержит: + - expected_contract: ожидаемые поля и ограничения. + - raw_llm_output: исходный (возможно невалидный) ответ. + + Задача: + - Извлеки максимально полный смысл из raw_llm_output. + - Верни ровно один JSON-объект, соответствующий expected_contract. + - Если часть данных отсутствует, используй безопасные значения по умолчанию: + - reason: "generated by repair" + - proposed_content: только если op=create/update, иначе не добавляй. + + Ограничения вывода: + - Только JSON-объект, без markdown/code fences/комментариев. + - Двойные кавычки, без trailing commas. + - Внутри строк (особенно proposed_content) все переносы строк должны быть экранированы как \\n, не literal newline. diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py index 2f7a532..7a249d1 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py @@ -64,6 +64,8 @@ class DocUpdateFromFeatureWorkflowGraph(WorkflowGraph[TContext]): "project_root": str(getattr(context, "project_root", "") or ""), "feature_content_len": len(str(getattr(context, "feature_content", "") or "")), "analysis_id": str(getattr(analytics, "analysis_id", "") or ""), + "application": str(getattr(analytics, "application", "") or ""), + "platform": str(getattr(analytics, "platform", "") or ""), "domains": list(getattr(analytics, "domains", []) or []), "subdomains": list(getattr(analytics, "subdomains", []) or []), "units_count": len(units), diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/models.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/models.py index f81a1b6..0602a74 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/models.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/models.py @@ -6,6 +6,8 @@ from dataclasses import dataclass, field @dataclass(slots=True) class AnalyticsMeta: analysis_id: str = "" + application: str = "" + platform: str = "" domains: list[str] = field(default_factory=list) subdomains: list[str] = field(default_factory=list) diff --git a/src/app/core/agent/runtime/agent_runtime.py b/src/app/core/agent/runtime/agent_runtime.py index 00f2ba7..1142632 100644 --- a/src/app/core/agent/runtime/agent_runtime.py +++ b/src/app/core/agent/runtime/agent_runtime.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from datetime import datetime, timezone from app.core.api.application.session_service import SessionService @@ -16,6 +17,8 @@ from app.infra.observability.request_trace_logger import RequestTraceLogger from app.schemas.common import ErrorPayload, ModuleName from app.schemas.orchestration import RequestExecutionStatus +LOGGER = logging.getLogger(__name__) + class AgentRuntime: def __init__( @@ -35,6 +38,12 @@ class AgentRuntime: self._trace_logger = trace_logger async def run(self, request: AgentRequest, session: AgentSession) -> None: + LOGGER.warning( + "runtime run started: request_id=%s process_version=%s active_rag_session_id=%s", + request.request_id, + request.process_version, + session.active_rag_session_id, + ) try: process = self._resolve_process(request.process_version) self._start_request(request, session) @@ -51,7 +60,19 @@ class AgentRuntime: request.apply_changeset = bool(result.apply_changeset) await self._publish_result(request) self._complete_request(request, session) + LOGGER.warning( + "runtime run completed: request_id=%s status=%s changeset_items=%s apply_changeset=%s", + request.request_id, + request.status, + len(request.changeset), + request.apply_changeset, + ) except Exception as exc: + LOGGER.exception( + "runtime run failed: request_id=%s process_version=%s", + request.request_id, + request.process_version, + ) await self._fail_request(request, exc) def _resolve_process(self, version: str): @@ -66,8 +87,8 @@ class AgentRuntime: self._trace_logger.start_request(request, session) async def _announce_start(self, request_id: str, process_version: str) -> None: - await self._publisher.publish_status(request_id, "runtime", "Запрос принят и поставлен в обработку.") - await self._publisher.publish_status( + await self._safe_publish_status(request_id, "runtime", "Запрос принят и поставлен в обработку.") + await self._safe_publish_status( request_id, "runtime", f"Запускаю процесс {process_version}.", @@ -75,8 +96,11 @@ class AgentRuntime: ) async def _publish_result(self, request: AgentRequest) -> None: - await self._publisher.publish_user(request.request_id, "agent", request.answer or "") - await self._publisher.publish_status(request.request_id, "runtime", "Обработка запроса завершена.") + try: + await self._publisher.publish_user(request.request_id, "agent", request.answer or "") + except Exception: + LOGGER.exception("failed to publish user event: request_id=%s", request.request_id) + await self._safe_publish_status(request.request_id, "runtime", "Обработка запроса завершена.") def _complete_request(self, request: AgentRequest, session: AgentSession) -> None: session.append_turn(user_message=request.message, assistant_message=request.answer or "") @@ -92,7 +116,7 @@ class AgentRuntime: request.error = self._build_error_payload(exc) self._request_store.save(request) self._trace_logger.fail_request(request) - await self._publisher.publish_status( + await self._safe_publish_status( request.request_id, "runtime", "Во время обработки запроса произошла ошибка.", @@ -107,3 +131,14 @@ class AgentRuntime: desc="Agent request failed unexpectedly.", module=ModuleName.AGENT, ) + + async def _safe_publish_status(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: + try: + await self._publisher.publish_status(request_id, source, text, payload) + except Exception: + LOGGER.exception( + "failed to publish status event: request_id=%s source=%s text=%s", + request_id, + source, + text, + ) diff --git a/src/app/core/agent/runtime/publisher.py b/src/app/core/agent/runtime/publisher.py index c3d86b1..98ca8fa 100644 --- a/src/app/core/agent/runtime/publisher.py +++ b/src/app/core/agent/runtime/publisher.py @@ -1,10 +1,14 @@ from __future__ import annotations +import logging + from app.core.api.domain.events.client_event import ClientEventRecord from app.core.api.infrastructure.streaming.sse_event_channel import SseEventChannel from app.infra.observability.request_trace_logger import RequestTraceLogger from app.schemas.client_events import ClientEventType +LOGGER = logging.getLogger(__name__) + class RuntimeEventPublisher: def __init__(self, channel: SseEventChannel, trace_logger: RequestTraceLogger) -> None: @@ -12,9 +16,23 @@ class RuntimeEventPublisher: self._trace_logger = trace_logger async def publish_status(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: + LOGGER.warning( + "publish status: request_id=%s source=%s text=%s payload_keys=%s", + request_id, + source, + text, + sorted(list((payload or {}).keys())), + ) await self._publish(request_id, ClientEventType.STATUS, source, text, payload) async def publish_user(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: + LOGGER.warning( + "publish user: request_id=%s source=%s text_len=%s payload_keys=%s", + request_id, + source, + len(text or ""), + sorted(list((payload or {}).keys())), + ) await self._publish(request_id, ClientEventType.USER, source, text, payload) async def _publish( diff --git a/src/app/core/api/application/request_service.py b/src/app/core/api/application/request_service.py index 8df37c7..c4a9fc8 100644 --- a/src/app/core/api/application/request_service.py +++ b/src/app/core/api/application/request_service.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging from app.core.api.domain.models.agent_request import AgentRequest from app.core.api.infrastructure.ids.request_id_factory import RequestIdFactory @@ -8,6 +9,8 @@ from app.core.api.infrastructure.stores.in_memory_request_store import InMemoryR from app.core.api.application.session_service import SessionService from app.core.agent.runtime import AgentRuntime +LOGGER = logging.getLogger(__name__) + class RequestService: def __init__( @@ -31,8 +34,28 @@ class RequestService: process_version=process_version, ) self._request_store.save(request) - asyncio.create_task(self._runtime.run(request, session)) + LOGGER.warning( + "plugin request accepted: request_id=%s session_id=%s process_version=%s message=%s", + request.request_id, + session_id, + process_version, + (message or "").replace("\n", "\\n")[:500], + ) + task = asyncio.create_task(self._runtime.run(request, session), name=f"agent-runtime:{request.request_id}") + task.add_done_callback(self._log_task_result) return request def get(self, request_id: str) -> AgentRequest | None: return self._request_store.get(request_id) + + def _log_task_result(self, task: asyncio.Task) -> None: + try: + exc = task.exception() + except asyncio.CancelledError: + LOGGER.warning("agent runtime task cancelled: task=%s", task.get_name()) + return + except Exception: + LOGGER.exception("failed to inspect agent runtime task result: task=%s", task.get_name()) + return + if exc is not None: + LOGGER.exception("agent runtime task crashed: task=%s", task.get_name(), exc_info=exc) diff --git a/src/app/core/api/controllers/rag_public_controller.py b/src/app/core/api/controllers/rag_public_controller.py index c1b9881..a2aa7f1 100644 --- a/src/app/core/api/controllers/rag_public_controller.py +++ b/src/app/core/api/controllers/rag_public_controller.py @@ -1,10 +1,14 @@ from __future__ import annotations +import logging + from app.core.api.infrastructure.streaming.sse_response_builder import build_sse_response from app.core.rag.module import RagModule from app.core.shared.messaging import EventBus from app.schemas.rag_sessions import RagSessionJobResponse +LOGGER = logging.getLogger(__name__) + class RagPublicController: def __init__(self, rag: RagModule) -> None: @@ -12,6 +16,14 @@ class RagPublicController: def get_job(self, rag_session_id: str, index_job_id: str) -> RagSessionJobResponse: job = self._rag.get_session_job(rag_session_id, index_job_id) + LOGGER.warning( + "rag job polled: rag_session_id=%s job_id=%s status=%s indexed=%s failed=%s", + rag_session_id, + index_job_id, + job.status.value if hasattr(job.status, "value") else str(job.status), + job.indexed_files, + job.failed_files, + ) return RagSessionJobResponse( rag_session_id=rag_session_id, index_job_id=job.index_job_id, @@ -25,6 +37,7 @@ class RagPublicController: async def stream_job_events(self, rag_session_id: str, index_job_id: str): channel_id, queue = await self._rag.subscribe_session_job_events(rag_session_id, index_job_id) + LOGGER.warning("rag job events subscribed: rag_session_id=%s job_id=%s", rag_session_id, index_job_id) return build_sse_response( queue, encoder=EventBus.as_sse, diff --git a/src/app/core/rag/embedding/gigachat_embedder.py b/src/app/core/rag/embedding/gigachat_embedder.py index 48eec36..f31c479 100644 --- a/src/app/core/rag/embedding/gigachat_embedder.py +++ b/src/app/core/rag/embedding/gigachat_embedder.py @@ -7,5 +7,11 @@ class GigaChatEmbedder: def __init__(self, client: GigaChatClient) -> None: self._client = client - def embed(self, texts: list[str]) -> list[list[float]]: - return self._client.embed(texts) + def embed( + self, + texts: list[str], + *, + timeout_sec: int | None = None, + max_retries: int | None = None, + ) -> list[list[float]]: + return self._client.embed(texts, timeout_sec=timeout_sec, max_retries=max_retries) diff --git a/src/app/core/rag/indexing/docs/document_builder.py b/src/app/core/rag/indexing/docs/document_builder.py index 3884421..79c61f8 100644 --- a/src/app/core/rag/indexing/docs/document_builder.py +++ b/src/app/core/rag/indexing/docs/document_builder.py @@ -2,46 +2,33 @@ from __future__ import annotations from app.core.rag.contracts import EvidenceLink, EvidenceType, RagDocument, RagLayer, RagSource from app.core.rag.indexing.docs.chunkers.markdown_chunker import SectionChunk +from app.core.rag.indexing.docs.frontmatter_metadata import merge_frontmatter_metadata from app.core.rag.indexing.docs.frontmatter_view import DocsFrontmatterView class DocsDocumentBuilder: def build_document_catalog(self, source: RagSource, frontmatter: dict, summary_text: str, doc_kind: str, *, fallback_title: str) -> RagDocument: view = DocsFrontmatterView(frontmatter) - document_id = view.document_id or source.path - metadata = { - "document_id": document_id, - "type": view.doc_type, - "name": view.name, - "title": view.title(fallback_title), - "module": view.module, - "domain": view.domain, - "subdomain": view.subdomain, - "layer": view.layer, - "status": view.status, - "updated_at": view.updated_at, - "tags": view.tags, - "entities": view.entities, - "parent": view.parent, - "children": view.children, - "links": view.links, - "source_path": source.path, - "summary_text": summary_text[:4000], - "doc_kind": doc_kind, - "artifact_type": "DOCS", - } + metadata = merge_frontmatter_metadata({}, frontmatter) + if view.document_id: + metadata["document_id"] = view.document_id + metadata["source_path"] = source.path + metadata["summary_text"] = summary_text[:4000] + metadata["doc_kind"] = doc_kind + row_title = str(frontmatter.get("title") or "").strip() or fallback_title or source.path return RagDocument( layer=RagLayer.DOCS_DOCUMENT_CATALOG, source=source, - title=metadata["title"] or document_id, - text=summary_text[:4000] or metadata["title"] or document_id, + title=row_title, + text=summary_text[:4000] or row_title, metadata=metadata, ) def build_doc_chunk(self, source: RagSource, chunk: SectionChunk, frontmatter: dict, doc_kind: str) -> RagDocument: view = DocsFrontmatterView(frontmatter) document_id = view.document_id or source.path - metadata = { + metadata = merge_frontmatter_metadata( + { "document_id": document_id, "type": view.doc_type, "module": view.module, @@ -54,7 +41,9 @@ class DocsDocumentBuilder: "doc_kind": doc_kind, "source_path": source.path, "artifact_type": "DOCS", - } + }, + frontmatter, + ) return RagDocument( layer=RagLayer.DOCS_DOC_CHUNKS, source=source, @@ -67,7 +56,8 @@ class DocsDocumentBuilder: def build_entity_record(self, source: RagSource, frontmatter: dict, entity: str) -> RagDocument: view = DocsFrontmatterView(frontmatter) document_id = view.document_id or source.path - metadata = { + metadata = merge_frontmatter_metadata( + { "entity_name": entity, "document_id": document_id, "document_type": view.doc_type, @@ -77,7 +67,9 @@ class DocsDocumentBuilder: "tags": view.tags, "source_path": source.path, "artifact_type": "DOCS", - } + }, + frontmatter, + ) return RagDocument( layer=RagLayer.DOCS_ENTITY_CATALOG, source=source, diff --git a/src/app/core/rag/indexing/docs/frontmatter_metadata.py b/src/app/core/rag/indexing/docs/frontmatter_metadata.py new file mode 100644 index 0000000..e62af4b --- /dev/null +++ b/src/app/core/rag/indexing/docs/frontmatter_metadata.py @@ -0,0 +1,13 @@ +from __future__ import annotations + + +def merge_frontmatter_metadata(base: dict[str, object], frontmatter: dict) -> dict[str, object]: + merged = dict(base) + if not isinstance(frontmatter, dict): + return merged + for raw_key, value in frontmatter.items(): + key = str(raw_key or "").strip() + if not key or key == "__frontmatter_parse_error__" or key in merged: + continue + merged[key] = value + return merged diff --git a/src/app/core/rag/indexing/job_store.py b/src/app/core/rag/indexing/job_store.py index 61480e4..8aeeaff 100644 --- a/src/app/core/rag/indexing/job_store.py +++ b/src/app/core/rag/indexing/job_store.py @@ -1,12 +1,17 @@ """Хранилище задач индексации RAG (in-memory + persistence).""" from dataclasses import dataclass +from datetime import UTC, datetime +import logging +import os from uuid import uuid4 from app.core.rag.persistence.repository import RagRepository from app.schemas.common import ErrorPayload, ModuleName from app.schemas.indexing import IndexJobStatus +LOGGER = logging.getLogger(__name__) + @dataclass class IndexJob: @@ -46,7 +51,7 @@ class IndexJobStore: desc=row.error_desc or "", module=module, ) - return IndexJob( + job = IndexJob( index_job_id=row.index_job_id, rag_session_id=row.rag_session_id, status=IndexJobStatus(row.status), @@ -56,6 +61,25 @@ class IndexJobStore: cache_miss_files=row.cache_miss_files, error=payload, ) + stale_timeout_sec = max(1, int(os.getenv("RAG_RUNNING_STALE_TIMEOUT_SEC", "8"))) + if job.status == IndexJobStatus.RUNNING and self._is_stale(row.updated_at, stale_timeout_sec): + payload = ErrorPayload( + code="index_stalled", + desc="Indexing stalled in running state; likely blocked network call during embedding/auth.", + module=ModuleName.RAG, + ) + job.status = IndexJobStatus.ERROR + job.error = payload + self.save(job) + LOGGER.error("rag index job marked stale->error: job_id=%s timeout_sec=%s", job.index_job_id, stale_timeout_sec) + return job + + def _is_stale(self, updated_at: datetime | None, stale_timeout_sec: int) -> bool: + if updated_at is None: + return False + ts = updated_at if updated_at.tzinfo else updated_at.replace(tzinfo=UTC) + age = (datetime.now(UTC) - ts).total_seconds() + return age >= stale_timeout_sec def save(self, job: IndexJob) -> None: error_code = job.error.code if job.error else None diff --git a/src/app/core/rag/indexing/orchestrator.py b/src/app/core/rag/indexing/orchestrator.py index 85bdf73..28f0744 100644 --- a/src/app/core/rag/indexing/orchestrator.py +++ b/src/app/core/rag/indexing/orchestrator.py @@ -1,6 +1,8 @@ """Оркестрация индексации RAG (очередь задач, события).""" import asyncio +import logging +import os from collections import defaultdict from app.schemas.common import ErrorPayload, ModuleName @@ -15,6 +17,8 @@ from app.core.rag.indexing.job_store import IndexJob, IndexJobStore from app.core.shared.messaging import EventBus from app.core.shared.resilience import RetryExecutor +LOGGER = logging.getLogger(__name__) + class IndexingOrchestrator: def __init__( @@ -32,11 +36,23 @@ class IndexingOrchestrator: async def enqueue_snapshot(self, rag_session_id: str, files: list[dict]) -> IndexJob: job = self._store.create(rag_session_id) + LOGGER.warning( + "rag index snapshot queued: job_id=%s rag_session_id=%s files=%s", + job.index_job_id, + rag_session_id, + len(files), + ) asyncio.create_task(self._process_snapshot(job.index_job_id, rag_session_id, files)) return job async def enqueue_changes(self, rag_session_id: str, changed_files: list[dict]) -> IndexJob: job = self._store.create(rag_session_id) + LOGGER.warning( + "rag index changes queued: job_id=%s rag_session_id=%s changes=%s", + job.index_job_id, + rag_session_id, + len(changed_files), + ) asyncio.create_task(self._process_changes(job.index_job_id, rag_session_id, changed_files)) return job @@ -71,9 +87,16 @@ class IndexingOrchestrator: async with lock: job = self._store.get(job_id) if not job: + LOGGER.warning("rag index job missing in store before start: job_id=%s", job_id) return job.status = IndexJobStatus.RUNNING self._store.save(job) + LOGGER.warning( + "rag index job running: job_id=%s rag_session_id=%s total_files=%s", + job_id, + rag_session_id, + total_files, + ) await self._events.publish( job_id, "index_status", @@ -94,13 +117,25 @@ class IndexingOrchestrator: }, ) - indexed, failed, cache_hits, cache_misses = await self._retry.run(lambda: operation(progress_cb)) + timeout_sec = max(1, int(os.getenv("RAG_INDEX_JOB_TIMEOUT_SEC", "15"))) + indexed, failed, cache_hits, cache_misses = await asyncio.wait_for( + operation(progress_cb), + timeout=timeout_sec, + ) job.status = IndexJobStatus.DONE job.indexed_files = indexed job.failed_files = failed job.cache_hit_files = cache_hits job.cache_miss_files = cache_misses self._store.save(job) + LOGGER.warning( + "rag index job done: job_id=%s indexed=%s failed=%s cache_hits=%s cache_misses=%s", + job_id, + indexed, + failed, + cache_hits, + cache_misses, + ) await self._events.publish( job_id, "index_status", @@ -129,12 +164,72 @@ class IndexingOrchestrator: ) except (TimeoutError, ConnectionError, OSError) as exc: job.status = IndexJobStatus.ERROR + job.failed_files = max(1, job.failed_files) job.error = ErrorPayload( - code="index_retry_exhausted", - desc=f"Temporary indexing failure after retries: {exc}", + code="index_runtime_error", + desc=f"Indexing failed: {exc}", module=ModuleName.RAG, ) self._store.save(job) + LOGGER.exception("rag index job runtime-error: job_id=%s", job_id) + await self._events.publish( + job_id, + "index_status", + {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + ) + await self._events.publish( + job_id, + "terminal", + { + "index_job_id": job_id, + "status": "error", + "total_files": total_files, + "error": { + "code": job.error.code, + "desc": job.error.desc, + "module": job.error.module.value, + }, + }, + ) + except asyncio.TimeoutError as exc: + job.status = IndexJobStatus.ERROR + job.failed_files = max(1, job.failed_files) + job.error = ErrorPayload( + code="index_timeout", + desc=f"Indexing timed out while processing snapshot/changes: {exc}", + module=ModuleName.RAG, + ) + self._store.save(job) + LOGGER.exception("rag index job timed out: job_id=%s", job_id) + await self._events.publish( + job_id, + "index_status", + {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + ) + await self._events.publish( + job_id, + "terminal", + { + "index_job_id": job_id, + "status": "error", + "total_files": total_files, + "error": { + "code": job.error.code, + "desc": job.error.desc, + "module": job.error.module.value, + }, + }, + ) + except Exception as exc: + job.status = IndexJobStatus.ERROR + job.failed_files = max(1, job.failed_files) + job.error = ErrorPayload( + code="index_unexpected_error", + desc=f"Unexpected indexing failure: {exc}", + module=ModuleName.RAG, + ) + self._store.save(job) + LOGGER.exception("rag index job failed unexpectedly: job_id=%s", job_id) await self._events.publish( job_id, "index_status", diff --git a/src/app/core/rag/indexing/service.py b/src/app/core/rag/indexing/service.py index e909be3..70a2513 100644 --- a/src/app/core/rag/indexing/service.py +++ b/src/app/core/rag/indexing/service.py @@ -32,6 +32,7 @@ class RagService: self._repo = repository self._docs = DocsIndexingPipeline() self._code = CodeIndexingPipeline() + self._cache_enabled = os.getenv("RAG_DOCUMENT_CACHE_ENABLED", "false").strip().lower() in {"1", "true", "yes", "on"} async def index_snapshot( self, @@ -39,8 +40,16 @@ class RagService: files: list[dict], progress_cb: Callable[[int, int, str], Awaitable[None] | None] | None = None, ) -> tuple[int, int, int, int]: + LOGGER.warning("rag index snapshot started: rag_session_id=%s files=%s", rag_session_id, len(files)) report = await self._index_files(rag_session_id, files, progress_cb=progress_cb) self._repo.replace_documents(rag_session_id, report.documents_list) + LOGGER.warning( + "rag index snapshot persisted: rag_session_id=%s indexed=%s failed=%s docs=%s", + rag_session_id, + report.indexed_files, + report.failed_files, + len(report.documents_list), + ) return report.as_tuple() async def index_changes( @@ -49,6 +58,7 @@ class RagService: changed_files: list[dict], progress_cb: Callable[[int, int, str], Awaitable[None] | None] | None = None, ) -> tuple[int, int, int, int]: + LOGGER.warning("rag index changes started: rag_session_id=%s changes=%s", rag_session_id, len(changed_files)) delete_paths: list[str] = [] upserts: list[dict] = [] for item in changed_files: @@ -58,6 +68,14 @@ class RagService: upserts.append(item) report = await self._index_files(rag_session_id, upserts, progress_cb=progress_cb) self._repo.apply_document_changes(rag_session_id, delete_paths, report.documents_list) + LOGGER.warning( + "rag index changes persisted: rag_session_id=%s indexed=%s failed=%s docs=%s delete_paths=%s", + rag_session_id, + report.indexed_files, + report.failed_files, + len(report.documents_list), + len(delete_paths), + ) return report.as_tuple() async def _index_files( @@ -80,9 +98,18 @@ class RagService: for index, file in enumerate(indexable_files, start=1): path = str(file.get("path", "")) try: + LOGGER.warning( + "rag index file started: rag_session_id=%s file=%s/%s path=%s", + rag_session_id, + index, + total_files, + path, + ) blob_sha = self._blob_sha(file) - cached = await asyncio.to_thread(self._repo.get_cached_documents, repo_id, blob_sha) pipelines = self._resolve_pipeline_names(path) + cached = [] + if self._cache_enabled: + cached = await asyncio.to_thread(self._repo.get_cached_documents, repo_id, blob_sha) if cached: self._report_missing_or_partial_docs(path, cached) report.documents_list.extend(self._with_file_metadata(cached, file, repo_id, blob_sha)) @@ -95,10 +122,33 @@ class RagService: ) else: built = self._build_documents(repo_id, path, file) + LOGGER.warning( + "rag index file built docs: rag_session_id=%s path=%s docs=%s", + rag_session_id, + path, + len(built), + ) self._report_missing_or_partial_docs(path, built) - embedded = await asyncio.to_thread(self._embed_documents, built, file, repo_id, blob_sha) + embed_timeout_sec = max(1, int(os.getenv("RAG_EMBED_FILE_TIMEOUT_SEC", "8"))) + LOGGER.warning( + "rag index file embedding started: rag_session_id=%s path=%s timeout_sec=%s", + rag_session_id, + path, + embed_timeout_sec, + ) + embedded = await asyncio.wait_for( + asyncio.to_thread(self._embed_documents, built, file, repo_id, blob_sha), + timeout=embed_timeout_sec, + ) + LOGGER.warning( + "rag index file embedded docs: rag_session_id=%s path=%s docs=%s", + rag_session_id, + path, + len(embedded), + ) report.documents_list.extend(embedded) - await asyncio.to_thread(self._repo.cache_documents, repo_id, path, blob_sha, embedded) + if self._cache_enabled: + await asyncio.to_thread(self._repo.cache_documents, repo_id, path, blob_sha, embedded) report.cache_miss_files += 1 LOGGER.warning( "rag ingest file: rag_session_id=%s path=%s processing=embed pipeline=%s", @@ -107,6 +157,13 @@ class RagService: ",".join(pipelines), ) report.indexed_files += 1 + LOGGER.warning( + "rag index file completed: rag_session_id=%s file=%s/%s path=%s", + rag_session_id, + index, + total_files, + path, + ) except Exception as exc: report.failed_files += 1 report.warnings.append(f"{path}: {exc}") @@ -116,6 +173,8 @@ class RagService: path, exc, ) + # Fail-fast: stop indexing immediately so caller can expose the exact error to plugin. + raise RuntimeError(f"RAG indexing failed for '{path}': {exc}") from exc await self._notify_progress(progress_cb, index, total_files, path) report.documents = len(report.documents_list) return report @@ -156,12 +215,32 @@ class RagService: if not docs: return [] batch_size = max(1, int(os.getenv("RAG_EMBED_BATCH_SIZE", "16"))) + request_timeout_sec = max(1, int(os.getenv("RAG_EMBED_REQUEST_TIMEOUT_SEC", "5"))) + request_retries = max(1, int(os.getenv("RAG_EMBED_REQUEST_MAX_RETRIES", "1"))) metadata = self._document_metadata(file, repo_id, blob_sha) for doc in docs: doc.metadata.update(metadata) for start in range(0, len(docs), batch_size): batch = docs[start : start + batch_size] - vectors = self._embedder.embed([doc.text for doc in batch]) + LOGGER.warning( + "rag embed batch start: path=%s batch_start=%s batch_size=%s timeout_sec=%s retries=%s", + file.get("path", ""), + start, + len(batch), + request_timeout_sec, + request_retries, + ) + vectors = self._embedder.embed( + [doc.text for doc in batch], + timeout_sec=request_timeout_sec, + max_retries=request_retries, + ) + LOGGER.warning( + "rag embed batch done: path=%s batch_start=%s vectors=%s", + file.get("path", ""), + start, + len(vectors), + ) for doc, vector in zip(batch, vectors): doc.embedding = vector return docs diff --git a/src/app/core/rag/persistence/job_repository.py b/src/app/core/rag/persistence/job_repository.py index ad20ceb..6dd59d1 100644 --- a/src/app/core/rag/persistence/job_repository.py +++ b/src/app/core/rag/persistence/job_repository.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass +from datetime import datetime from sqlalchemy import text @@ -19,6 +20,7 @@ class RagJobRow: error_code: str | None error_desc: str | None error_module: str | None + updated_at: datetime | None class RagJobRepository: @@ -85,7 +87,7 @@ class RagJobRepository: text( """ SELECT index_job_id, rag_session_id, status, indexed_files, failed_files, - cache_hit_files, cache_miss_files, error_code, error_desc, error_module + cache_hit_files, cache_miss_files, error_code, error_desc, error_module, updated_at FROM rag_index_jobs WHERE index_job_id = :jid """ diff --git a/src/app/core/shared/gigachat/client.py b/src/app/core/shared/gigachat/client.py index 0127295..f3ea41e 100644 --- a/src/app/core/shared/gigachat/client.py +++ b/src/app/core/shared/gigachat/client.py @@ -1,4 +1,5 @@ import time +import logging import requests @@ -6,6 +7,9 @@ from app.infra.constants import MAX_RETRIES from app.core.shared.gigachat.errors import GigaChatError from app.core.shared.gigachat.settings import GigaChatSettings from app.core.shared.gigachat.token_provider import GigaChatTokenProvider +from app.core.shared.network.hard_timeout import run_with_hard_timeout + +LOGGER = logging.getLogger(__name__) class GigaChatClient: @@ -30,13 +34,26 @@ class GigaChatClient: message = choices[0].get("message") or {} return str(message.get("content") or "") - def embed(self, texts: list[str]) -> list[list[float]]: + def embed( + self, + texts: list[str], + *, + timeout_sec: int | None = None, + max_retries: int | None = None, + ) -> list[list[float]]: token = self._tokens.get_access_token() payload = { "model": self._settings.embedding_model, "input": texts, } - response = self._post_with_retry("/embeddings", payload, token=token, timeout=90, operation_name="embeddings") + response = self._post_with_retry( + "/embeddings", + payload, + token=token, + timeout=timeout_sec or 90, + operation_name="embeddings", + max_retries=max_retries, + ) data = response.json() items = data.get("data") if not isinstance(items, list): @@ -51,21 +68,50 @@ class GigaChatClient: token: str, timeout: int, operation_name: str, + max_retries: int | None = None, ): last_error: Exception | None = None - for attempt in range(1, MAX_RETRIES + 1): + retries = max(1, int(max_retries or MAX_RETRIES)) + for attempt in range(1, retries + 1): try: - response = requests.post( - f"{self._settings.api_url.rstrip('/')}{path}", - json=payload, - headers={ - "Authorization": f"Bearer {token}", - "Content-Type": "application/json", - }, - timeout=timeout, - verify=self._settings.ssl_verify, + LOGGER.warning( + "gigachat request start: operation=%s path=%s attempt=%s/%s timeout_sec=%s", + operation_name, + path, + attempt, + retries, + timeout, + ) + response = run_with_hard_timeout( + lambda: requests.post( + f"{self._settings.api_url.rstrip('/')}{path}", + json=payload, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + timeout=timeout, + verify=self._settings.ssl_verify, + ), + timeout_sec=timeout, + operation_name=f"gigachat_{operation_name}", + ) + LOGGER.warning( + "gigachat request done: operation=%s path=%s attempt=%s/%s status=%s", + operation_name, + path, + attempt, + retries, + response.status_code, ) except requests.RequestException as exc: + LOGGER.exception( + "gigachat request failed: operation=%s path=%s attempt=%s/%s", + operation_name, + path, + attempt, + retries, + ) last_error = GigaChatError(f"GigaChat {operation_name} request failed: {exc}") else: if response.status_code < 400: @@ -73,7 +119,7 @@ class GigaChatClient: last_error = GigaChatError(f"GigaChat {operation_name} error {response.status_code}: {response.text}") if not self._is_retryable_status(response.status_code): raise last_error - if attempt == MAX_RETRIES: + if attempt == retries: break time.sleep(0.1 * attempt) if last_error is None: diff --git a/src/app/core/shared/gigachat/token_provider.py b/src/app/core/shared/gigachat/token_provider.py index 0d8adb6..98df2cf 100644 --- a/src/app/core/shared/gigachat/token_provider.py +++ b/src/app/core/shared/gigachat/token_provider.py @@ -1,11 +1,16 @@ import threading import time import uuid +import logging +import os import requests from app.core.shared.gigachat.errors import GigaChatError from app.core.shared.gigachat.settings import GigaChatSettings +from app.core.shared.network.hard_timeout import run_with_hard_timeout + +LOGGER = logging.getLogger(__name__) class GigaChatTokenProvider: @@ -30,6 +35,7 @@ class GigaChatTokenProvider: def _fetch_token(self) -> tuple[str, float]: if not self._settings.credentials: raise GigaChatError("GIGACHAT_TOKEN is not set") + timeout_sec = max(1, int(os.getenv("GIGACHAT_AUTH_TIMEOUT_SEC", "5"))) headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", @@ -37,14 +43,21 @@ class GigaChatTokenProvider: "RqUID": str(uuid.uuid4()), } try: - response = requests.post( - self._settings.auth_url, - headers=headers, - data=f"scope={self._settings.scope}", - timeout=30, - verify=self._settings.ssl_verify, + LOGGER.warning("gigachat auth start: url=%s timeout_sec=%s", self._settings.auth_url, timeout_sec) + response = run_with_hard_timeout( + lambda: requests.post( + self._settings.auth_url, + headers=headers, + data=f"scope={self._settings.scope}", + timeout=timeout_sec, + verify=self._settings.ssl_verify, + ), + timeout_sec=timeout_sec, + operation_name="gigachat_auth", ) + LOGGER.warning("gigachat auth done: status=%s", response.status_code) except requests.RequestException as exc: + LOGGER.exception("gigachat auth failed") raise GigaChatError(f"GigaChat auth request failed: {exc}") from exc if response.status_code >= 400: diff --git a/src/app/core/shared/network/hard_timeout.py b/src/app/core/shared/network/hard_timeout.py new file mode 100644 index 0000000..e484a50 --- /dev/null +++ b/src/app/core/shared/network/hard_timeout.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import queue +import threading +from collections.abc import Callable +from typing import TypeVar + +T = TypeVar("T") + + +def run_with_hard_timeout(operation: Callable[[], T], *, timeout_sec: int, operation_name: str) -> T: + result_queue: queue.Queue[tuple[bool, object]] = queue.Queue(maxsize=1) + + def _runner() -> None: + try: + result_queue.put((True, operation())) + except BaseException as exc: # noqa: BLE001 + result_queue.put((False, exc)) + + thread = threading.Thread(target=_runner, name=f"hard-timeout:{operation_name}", daemon=True) + thread.start() + thread.join(timeout=max(1, int(timeout_sec))) + if thread.is_alive(): + raise TimeoutError(f"{operation_name} exceeded hard timeout ({timeout_sec}s)") + if result_queue.empty(): + raise TimeoutError(f"{operation_name} finished without a result") + ok, value = result_queue.get_nowait() + if ok: + return value # type: ignore[return-value] + raise value # type: ignore[misc] + diff --git a/src/app/main.py b/src/app/main.py index 3dcb41f..b06529a 100644 --- a/src/app/main.py +++ b/src/app/main.py @@ -2,6 +2,7 @@ import logging from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +from starlette.requests import Request from app.infra.logging_setup import configure_logging from app.infra.error_handlers import register_error_handlers @@ -19,6 +20,7 @@ def create_app() -> FastAPI: app = FastAPI(title="Agent Backend MVP", version="0.1.0") modules = ModularApplication() app.state.modules = modules + logger = logging.getLogger("app.http") app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -30,6 +32,22 @@ def create_app() -> FastAPI: app.include_router(modules.api.public_router()) register_error_handlers(app) + @app.middleware("http") + async def log_http_requests(request: Request, call_next): + logger.warning("http request: method=%s path=%s query=%s", request.method, request.url.path, request.url.query) + try: + response = await call_next(request) + logger.warning( + "http response: method=%s path=%s status=%s", + request.method, + request.url.path, + response.status_code, + ) + return response + except Exception: + logger.exception("http request failed: method=%s path=%s", request.method, request.url.path) + raise + @app.on_event("startup") async def startup() -> None: modules.startup() diff --git a/tests/unit_tests/agent/test_api_endpoint_collector.py b/tests/unit_tests/agent/test_api_endpoint_collector.py index 5bb8e44..62f3379 100644 --- a/tests/unit_tests/agent/test_api_endpoint_collector.py +++ b/tests/unit_tests/agent/test_api_endpoint_collector.py @@ -51,7 +51,7 @@ def test_collector_ignores_file_paths_from_content() -> None: assert endpoints == ["GET /health"] -def test_collector_uses_title_path_fallback_when_endpoint_metadata_missing() -> None: +def test_collector_ignores_title_when_endpoint_metadata_missing() -> None: rows = [ { "metadata": { @@ -65,4 +65,4 @@ def test_collector_uses_title_path_fallback_when_endpoint_metadata_missing() -> endpoints = ApiEndpointCollector().collect(rows) - assert endpoints == ["GET /actions/{action}"] + assert endpoints == [] diff --git a/tests/unit_tests/rag/test_docs_indexing_pipeline.py b/tests/unit_tests/rag/test_docs_indexing_pipeline.py index 288556e..a7b5b1b 100644 --- a/tests/unit_tests/rag/test_docs_indexing_pipeline.py +++ b/tests/unit_tests/rag/test_docs_indexing_pipeline.py @@ -20,6 +20,8 @@ sub_domain: invoices layer: application status: draft updated_at: 2026-03-23 +endpoint: POST /billing/invoices +source_of_truth: analytics tags: [billing, api] entities: [Invoice] parent: billing_api @@ -125,9 +127,13 @@ Create invoice catalog_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_DOCUMENT_CATALOG) assert catalog_doc.metadata["document_id"] == "api.billing.create_invoice" + assert catalog_doc.metadata["id"] == "api.billing.create_invoice" assert catalog_doc.metadata["module"] == "billing" assert catalog_doc.metadata["domain"] == "billing" - assert catalog_doc.metadata["subdomain"] == "invoices" + assert catalog_doc.metadata["sub_domain"] == "invoices" + assert "subdomain" not in catalog_doc.metadata + assert catalog_doc.metadata["endpoint"] == "POST /billing/invoices" + assert catalog_doc.metadata["source_of_truth"] == "analytics" assert catalog_doc.metadata["summary_text"] == "Creates an invoice in billing." fact_texts = [doc.text for doc in docs if doc.layer == RagLayer.DOCS_FACT_INDEX] @@ -335,3 +341,4 @@ Control actions endpoint. catalog = next(doc for doc in docs if doc.layer == RagLayer.DOCS_DOCUMENT_CATALOG) assert catalog.metadata["type"] == "api_method" assert catalog.metadata["title"] == "HTTP API /actions/{action}" + assert catalog.metadata["endpoint"] == "GET|POST /actions/{action}"