diff --git a/src/app/core/agent/processes/v2/intent_router/modules/target_terms.py b/src/app/core/agent/processes/v2/intent_router/modules/target_terms.py index 3af9d55..151ebb1 100644 --- a/src/app/core/agent/processes/v2/intent_router/modules/target_terms.py +++ b/src/app/core/agent/processes/v2/intent_router/modules/target_terms.py @@ -53,6 +53,18 @@ class _EndpointPathExtractor: _PATH_RE = re.compile(r"`([^`]+)`|(/[A-Za-z0-9_./{}-]+)") _VALID_ENDPOINT_RE = re.compile(r"^/[a-z0-9._/-]+(?:/\{[a-z0-9_]+\})?$") _DOC_EXTENSIONS = (".md", ".yaml", ".yml", ".json") + _FILESYSTEM_PREFIXES = ( + "/users/", + "/home/", + "/tmp/", + "/var/", + "/opt/", + "/etc/", + "/private/", + "/mnt/", + "/workspace/", + "/workspaces/", + ) def extract(self, query: str) -> list[str]: values: list[str] = [] @@ -72,6 +84,8 @@ class _EndpointPathExtractor: def _is_endpoint(self, token: str) -> bool: if not token or not self._VALID_ENDPOINT_RE.fullmatch(token): return False + if token.startswith(self._FILESYSTEM_PREFIXES): + return False return not token.endswith(self._DOC_EXTENSIONS) def _append_unique(self, items: list[str], value: str) -> None: diff --git a/src/app/core/agent/processes/v2/intent_router/router.py b/src/app/core/agent/processes/v2/intent_router/router.py index f987c99..da74027 100644 --- a/src/app/core/agent/processes/v2/intent_router/router.py +++ b/src/app/core/agent/processes/v2/intent_router/router.py @@ -2,8 +2,10 @@ from __future__ import annotations +import re from collections.abc import Callable from dataclasses import replace +from typing import TYPE_CHECKING from app.core.agent.processes.v2.intent_router.modules.anchors import V2AnchorExtractor from app.core.agent.processes.v2.intent_router.modules.normalizer import V2QueryNormalizer @@ -22,7 +24,66 @@ from app.core.agent.processes.v2.intent_router.routers.route_catalog import V2Ro from app.core.agent.processes.v2.intent_router.routers.validator import V2RouteValidator from app.core.agent.utils.process_v2.models import V2RouteResult, V2ScopeType from app.core.agent.utils.llm import AgentLlmService -from app.core.rag.persistence.query_repository import RagQueryRepository + +if TYPE_CHECKING: + from app.core.rag.persistence.query_repository import RagQueryRepository + + +class _ExplicitDocsUpdateResolver: + _UPDATE_MARKERS = ( + "собери документац", + "сгенерир", + "построй документац", + "обнови документац", + "обновить документац", + "generate documentation", + "build documentation", + "update documentation", + ) + _FEATURE_MARKERS = ( + "/features/", + "\\features\\", + "feature", + "системной аналитик", + "confluence", + ) + _PATH_PATTERN = re.compile(r"(/[^\n`]+?\.md)") + _URL_PATTERN = re.compile(r"https?://[^\s)]*confluence[^\s)]*") + + def matches(self, user_query: str) -> bool: + query = str(user_query or "") + lowered = query.lower() + if not any(marker in lowered for marker in self._UPDATE_MARKERS): + return False + path = self._extract_path(query) + if path and self._is_feature_source(path): + return True + url = self._extract_confluence_url(query) + if url: + return True + return any(marker in lowered for marker in self._FEATURE_MARKERS) + + def _extract_path(self, query: str) -> str: + if "`" in query: + for chunk in query.split("`"): + value = chunk.strip().strip('"').strip("'") + if value.endswith(".md") and value.startswith("/"): + return value + match = self._PATH_PATTERN.search(query) + return match.group(1).strip().strip('"').strip("'") if match else "" + + def _extract_confluence_url(self, query: str) -> str: + match = self._URL_PATTERN.search(query) + return match.group(0).strip() if match else "" + + def _is_feature_source(self, path: str) -> bool: + lowered = str(path or "").lower() + return "/feature" in lowered + + +class _ExplicitFileLookupResolver: + def matches(self, anchor_analysis) -> bool: + return bool(getattr(anchor_analysis.anchors, "file_names", [])) def _scope_candidate_dict(candidate) -> dict[str, object]: @@ -56,6 +117,8 @@ class V2IntentRouter: self._enable_llm_disambiguation = enable_llm_disambiguation self._llm_router = V2LlmRouter(llm, catalog=self._catalog) if llm is not None else None self._scope_rows_provider = scope_rows_provider + self._explicit_docs_update_resolver = _ExplicitDocsUpdateResolver() + self._explicit_file_lookup_resolver = _ExplicitFileLookupResolver() def route(self, user_query: str, *, rag_session_id: str | None = None) -> V2RouteResult: normalized_query = self._normalizer.normalize(user_query) @@ -98,6 +161,36 @@ class V2IntentRouter: endpoint_markers=list(anchor_analysis.endpoint_markers), scope_type=resolution.scope_type, ) + if self._explicit_docs_update_resolver.matches(user_query): + return V2RouteResult( + routing_domain="DOCS", + intent="DOC_UPDATE", + subintent="FROM_FEATURE", + user_query=user_query, + normalized_query=features.normalized_query, + target_terms=features.target_terms, + anchors=anchor_analysis.anchors, + confidence=1.0, + routing_mode="deterministic", + llm_router_used=False, + reason_short="explicit docs update from feature source", + scope_type=resolution.scope_type, + ) + if self._explicit_file_lookup_resolver.matches(anchor_analysis): + return V2RouteResult( + routing_domain="DOCS", + intent="DOC_EXPLAIN", + subintent="FIND_FILES", + user_query=user_query, + normalized_query=features.normalized_query, + target_terms=features.target_terms, + anchors=anchor_analysis.anchors, + confidence=1.0, + routing_mode="deterministic", + llm_router_used=False, + reason_short="explicit file reference", + scope_type=resolution.scope_type, + ) llm_attempted = self._enable_llm_disambiguation and self._llm_router is not None llm_candidate = self._route_with_llm( features=features, @@ -121,11 +214,12 @@ class V2IntentRouter: scope_type=resolution.scope_type, ) if llm_attempted: - return self._fallback_router.route_without_deterministic_signals( + return self._fallback_router.route( user_query=user_query, features=features, anchors=anchor_analysis.anchors, scope_type=resolution.scope_type, + llm_attempted=True, ) return self._fallback_router.route( user_query=user_query, @@ -142,10 +236,15 @@ class V2IntentRouter: if self._scope_rows_provider is not None: return self._scope_rows_provider(sid) try: - return RagQueryRepository().list_docs_scope_index_rows(sid) + return self._build_query_repository().list_docs_scope_index_rows(sid) except Exception: return [] + def _build_query_repository(self) -> "RagQueryRepository": + from app.core.rag.persistence.query_repository import RagQueryRepository + + return RagQueryRepository() + def _apply_scope_to_anchors(self, anchors, resolution) -> None: anchors.candidate_domains = list(resolution.candidate_domains) anchors.candidate_subdomains = list(resolution.candidate_subdomains) diff --git a/src/app/core/agent/processes/v2/v2_process.py b/src/app/core/agent/processes/v2/v2_process.py index 3b2f53f..dc36216 100644 --- a/src/app/core/agent/processes/v2/v2_process.py +++ b/src/app/core/agent/processes/v2/v2_process.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import Any from app.core.agent.processes.base import AgentProcess, ProcessResult @@ -88,7 +89,21 @@ class V2Process(AgentProcess): async def run(self, context) -> ProcessResult: rag_session_id = context.session.active_rag_session_id or "" - route = self._router.route(context.request.message, rag_session_id=rag_session_id or None) + route = await asyncio.to_thread( + self._router.route, + context.request.message, + rag_session_id=rag_session_id or None, + ) + await context.publisher.publish_status( + context.request.request_id, + "process.v2", + f"Запрос принял, переход в {self._subintent_label(route.intent, route.subintent)}.", + { + "routing_domain": route.routing_domain, + "intent": route.intent, + "subintent": route.subintent, + }, + ) context.trace.module("process.v2").log( "intent_routed", { @@ -148,6 +163,16 @@ class V2Process(AgentProcess): def _log_step(self, context, step: str, payload: dict[str, object]) -> None: context.trace.module("process.v2.pipeline").log(step, payload) + def _subintent_label(self, intent: str, subintent: str) -> str: + labels = { + (V2Intent.DOC_EXPLAIN, V2Subintent.SUMMARY): "объяснение документации", + (V2Intent.DOC_EXPLAIN, V2Subintent.FIND_FILES): "поиск файлов документации", + (V2Intent.DOC_EXPLAIN, V2Subintent.API_EXPOSED): "проверку экспонирования API", + (V2Intent.DOC_UPDATE, V2Subintent.FROM_FEATURE): "обновление документации по аналитике", + (V2Intent.GENERAL_QA, V2Subintent.SUMMARY): "общий ответ по проекту", + } + return labels.get((intent, subintent), str(subintent).lower()) + async def _run_workflow(self, runtime_context, route, rag_session_id: str): workflow = self._workflows.get((route.routing_domain, route.intent, route.subintent)) if workflow is None: diff --git a/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/workflow_runtime/buffered_graph.py index 6e17aa8..eb3b158 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_explain_api_exposed/workflow_runtime/buffered_graph.py @@ -17,15 +17,16 @@ class DocExplainApiExposedWorkflowGraph(WorkflowGraph[TContext]): steps_buffer: list[dict[str, object]] = [] for step in self._steps: inp = step.trace_input(context) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + out = step.trace_output(next_context) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, ) - context = await step.run(context) - out = step.trace_output(context) trace.log( "workflow_step_traced", { @@ -36,7 +37,7 @@ class DocExplainApiExposedWorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}) trace.log("workflow_completed", {"workflow_id": self._workflow_id}) return context - diff --git a/src/app/core/agent/processes/v2/workflows/doc_explain_find_files/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_explain_find_files/workflow_runtime/buffered_graph.py index 3fbd3a3..7a3502f 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_explain_find_files/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_explain_find_files/workflow_runtime/buffered_graph.py @@ -19,15 +19,16 @@ class DocExplainFindFilesWorkflowGraph(WorkflowGraph[TContext]): steps_buffer: list[dict[str, object]] = [] for step in self._steps: inp = step.trace_input(context) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + out = step.trace_output(next_context) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, ) - context = await step.run(context) - out = step.trace_output(context) trace.log( "workflow_step_traced", { @@ -38,6 +39,7 @@ class DocExplainFindFilesWorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log( "workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}, diff --git a/src/app/core/agent/processes/v2/workflows/doc_explain_summary/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_explain_summary/workflow_runtime/buffered_graph.py index ba0c1ed..ba06517 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_explain_summary/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_explain_summary/workflow_runtime/buffered_graph.py @@ -19,15 +19,16 @@ class DocExplainSummaryWorkflowGraph(WorkflowGraph[TContext]): steps_buffer: list[dict[str, object]] = [] for step in self._steps: inp = step.trace_input(context) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + out = step.trace_output(next_context) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, ) - context = await step.run(context) - out = step.trace_output(context) trace.log( "workflow_step_traced", { @@ -38,6 +39,7 @@ class DocExplainSummaryWorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log( "workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}, diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py index 7a249d1..7c324ed 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature/workflow_runtime/buffered_graph.py @@ -19,17 +19,18 @@ class DocUpdateFromFeatureWorkflowGraph(WorkflowGraph[TContext]): before = self._snapshot(context) raw_inp = step.trace_input(context) inp = self._merge_trace_payload(raw_inp, before) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, - ) - context = await step.run(context) - after = self._snapshot(context) - raw_out = step.trace_output(context) + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + after = self._snapshot(next_context) + raw_out = step.trace_output(next_context) out = self._merge_trace_payload(raw_out, after) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, + ) trace.log( "workflow_step_traced", { @@ -40,6 +41,7 @@ class DocUpdateFromFeatureWorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}) trace.log("workflow_completed", {"workflow_id": self._workflow_id}) return context diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step2_load_source_content/step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step2_load_source_content/step.py index 6701c2e..1c33095 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step2_load_source_content/step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step2_load_source_content/step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from pathlib import Path from typing import Any @@ -30,7 +31,7 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]): context.issues.append(f"Файл системной аналитики не найден: {context.source_ref}") return context try: - context.source_content = source_path.read_text(encoding="utf-8") + context.source_content = await asyncio.to_thread(source_path.read_text, encoding="utf-8") context.project_root = self._resolve_project_root(source_path).as_posix() except Exception as exc: context.issues.append(f"Не удалось прочитать системную аналитику: {exc}") @@ -42,6 +43,9 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]): idx = parts.index("_incoming") if idx > 0: return Path(*parts[:idx]) + for parent in source_path.parents: + if (parent / "docs").is_dir(): + return parent return source_path.parent def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]: @@ -52,3 +56,6 @@ class LoadSourceContentStep(WorkflowStep[DocUpdateFromFeatureV2Context]): "project_root": context.project_root, "source_content_len": len(context.source_content or ""), } + + def get_after_status_message(self): + return "Системная аналитика загружена" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step3_parse_requirements/step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step3_parse_requirements/step.py index cdc72b5..adc5c7c 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step3_parse_requirements/step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step3_parse_requirements/step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import Any from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step3_parse_requirements.parser import ( @@ -21,7 +22,7 @@ class ParseRequirementsStep(WorkflowStep[DocUpdateFromFeatureV2Context]): async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context: if context.answer or not context.source_content: return context - meta, units = self._parser.parse(context.source_content) + meta, units = await asyncio.to_thread(self._parser.parse, context.source_content) context.analytics_meta = meta context.requirements = units @@ -54,3 +55,6 @@ class ParseRequirementsStep(WorkflowStep[DocUpdateFromFeatureV2Context]): for item in context.requirements ], } + + def get_after_status_message(self): + return "Функциональные требования прочитаны" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_load_rules/step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_load_rules/step.py index c9a4a46..0c3c249 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_load_rules/step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_load_rules/step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from pathlib import Path from typing import Any @@ -23,7 +24,16 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]): if not self._rules_root.exists(): context.issues.append(f"Папка rules не найдена: {self._rules_root.as_posix()}") return context + loaded, issues = await asyncio.to_thread(self._load_rules) + context.issues.extend(issues) + context.rules = loaded + if not context.rules: + context.issues.append("Rules v2 пустые: не найдено ни одного *.md файла.") + return context + + def _load_rules(self) -> tuple[list[RuleDocument], list[str]]: loaded: list[RuleDocument] = [] + issues: list[str] = [] for item in sorted(self._rules_root.rglob("*.md")): try: loaded.append( @@ -33,11 +43,8 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]): ) ) except Exception as exc: - context.issues.append(f"Не удалось прочитать rule {item.name}: {exc}") - context.rules = loaded - if not context.rules: - context.issues.append("Rules v2 пустые: не найдено ни одного *.md файла.") - return context + issues.append(f"Не удалось прочитать rule {item.name}: {exc}") + return loaded, issues def _discover_rules_root(self) -> Path: current = Path(__file__).resolve() @@ -55,3 +62,6 @@ class LoadRulesStep(WorkflowStep[DocUpdateFromFeatureV2Context]): "rules_count": len(context.rules), "rule_names": [item.name for item in context.rules], } + + def get_after_status_message(self): + return "Загружены правила документации v3" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_prepare_tasks/step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_prepare_tasks/step.py index d73bb58..e5223c7 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_prepare_tasks/step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step4_prepare_tasks/step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING from typing import Any @@ -30,12 +31,15 @@ class PrepareRequirementTasksStep(WorkflowStep[DocUpdateFromFeatureV2Context]): async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context: if context.answer or not context.requirements: return context - self._catalog_loader.load(context) - context.requirement_tasks = self._task_orderer.order(self._task_builder.build(context)) + context.requirement_tasks = await asyncio.to_thread(self._build_tasks, context) if not context.requirement_tasks: context.issues.append("Не удалось подготовить задачи по разделу 6 аналитики.") return context + def _build_tasks(self, context: DocUpdateFromFeatureV2Context): + self._catalog_loader.load(context) + return self._task_orderer.order(self._task_builder.build(context)) + def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]: return { "requirements_count": len(context.requirements), @@ -64,3 +68,6 @@ class PrepareRequirementTasksStep(WorkflowStep[DocUpdateFromFeatureV2Context]): for item in context.requirement_tasks ], } + + def get_after_status_message(self): + return "Составялем план изменений" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/path_resolver.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/path_resolver.py index c7328e2..53f56e2 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/path_resolver.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/path_resolver.py @@ -2,12 +2,19 @@ from __future__ import annotations import re +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import ( + DocTypeNormalizer, +) + class DocsPathResolver: + def __init__(self, normalizer: DocTypeNormalizer | None = None) -> None: + self._normalizer = normalizer or DocTypeNormalizer() + def resolve(self, *, application: str, platform: str, doc_type: str, doc_id: str, domain: str = "") -> str: root = self._clean(domain) or self._clean(application) or "common" plat = self._clean(platform) or "web" - dtype = self._clean(doc_type) or "misc" + dtype = self._clean(self._normalizer.normalize(doc_type)) or "misc" did = self._clean(doc_id) or "untitled" return f"docs/{root}/{plat}/{dtype}/{did}.md" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/prompts/prompts.yml b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/prompts/prompts.yml index 8431df5..5dca7b4 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/prompts/prompts.yml +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/prompts/prompts.yml @@ -3,6 +3,7 @@ namespace: v2_docs_update_v2 prompts: resolve_attributes_fallback: | Определи недостающие атрибуты страницы документации по секции аналитики и структуре docs catalog. + Используй только канонические значения `doc_type`: `api_method`, `logic_block`, `architecture_overview`, `db_table`, `ui_page`. Верни только JSON-объект с полями: doc_type, id, application, platform, domain, sub_domain. Не добавляй пояснений. diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/services.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/services.py index 819cfc4..977035f 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/services.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/services.py @@ -8,6 +8,9 @@ from pathlib import Path from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step5_execute_subprocesses.classifier import ( DeleteIntentHeuristic, ) +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import ( + DocTypeNormalizer, +) from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.create_doc import ( CreateDocSubprocess, ) @@ -30,9 +33,10 @@ from app.schemas.changeset import ChangeItem, ChangeOp class TaskAttributeResolver: def __init__(self, llm: AgentLlmService) -> None: self._llm = llm + self._doc_type_normalizer = DocTypeNormalizer() def resolve(self, context: DocUpdateFromFeatureV2Context, task: RequirementTaskContext, rules_text: str) -> None: - task.doc_type = str(task.metadata.get("doc_type") or task.metadata.get("type") or "").strip() + task.doc_type = self._normalize_doc_type(task.metadata.get("doc_type") or task.metadata.get("type") or "") task.doc_id = str(task.metadata.get("id") or self._slug(task.heading)).strip() task.application = str(task.metadata.get("application") or context.analytics_meta.application or "").strip() task.platform = str(task.metadata.get("platform") or context.analytics_meta.platform or "").strip().lower() @@ -67,7 +71,7 @@ class TaskAttributeResolver: log_context="workflow.v2.docs_update.from_feature_v2.resolve_attributes", ) parsed = self._json_or_empty(raw) - task.doc_type = task.doc_type or str(parsed.get("doc_type") or parsed.get("type") or "").strip() + task.doc_type = task.doc_type or self._normalize_doc_type(parsed.get("doc_type") or parsed.get("type") or "") task.doc_id = task.doc_id or str(parsed.get("id") or self._slug(task.heading)).strip() task.application = task.application or str(parsed.get("application") or "").strip() task.platform = task.platform or str(parsed.get("platform") or "web").strip().lower() @@ -77,6 +81,9 @@ class TaskAttributeResolver: def _slug(self, value: str) -> str: return re.sub(r"[^a-z0-9._-]+", "-", (value or "").strip().lower()).strip(".-") or "untitled" + def _normalize_doc_type(self, value: object) -> str: + return self._doc_type_normalizer.normalize(str(value or "")) + def _json_or_empty(self, raw: str) -> dict[str, object]: value = str(raw or "").strip() if not value: diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/step.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/step.py index 25ad820..4d42a01 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/step.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/steps/step5_execute_subprocesses/step.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import Any from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step5_execute_subprocesses.services import ( @@ -22,9 +23,12 @@ class ExecuteRequirementSubprocessesStep(WorkflowStep[DocUpdateFromFeatureV2Cont async def run(self, context: DocUpdateFromFeatureV2Context) -> DocUpdateFromFeatureV2Context: if context.answer or not context.requirement_tasks: return context + await asyncio.to_thread(self._execute_all, context) + return context + + def _execute_all(self, context: DocUpdateFromFeatureV2Context) -> None: for task in context.requirement_tasks: self._change_executor.execute(context, task) - return context def trace_input(self, context: DocUpdateFromFeatureV2Context) -> dict[str, Any]: return { @@ -57,3 +61,6 @@ class ExecuteRequirementSubprocessesStep(WorkflowStep[DocUpdateFromFeatureV2Cont for item in context.accumulated_pages ], } + + def get_after_status_message(self): + return "Правки подготовлены" diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/doc_type_normalizer.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/doc_type_normalizer.py new file mode 100644 index 0000000..0d9ec42 --- /dev/null +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/doc_type_normalizer.py @@ -0,0 +1,12 @@ +from __future__ import annotations + + +class DocTypeNormalizer: + _ALIASES = { + "data_entity": "db_table", + "domain_entity": "db_table", + } + + def normalize(self, doc_type: str) -> str: + value = str(doc_type or "").strip().lower() + return self._ALIASES.get(value, value) diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/template_registry.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/template_registry.py index 905a497..0a6f5ea 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/template_registry.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/subprocesses/common/template_registry.py @@ -3,6 +3,9 @@ from __future__ import annotations from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.rules_catalog import ( RulesCatalog, ) +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.doc_type_normalizer import ( + DocTypeNormalizer, +) from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_models import ( TemplateSpec, ) @@ -12,10 +15,16 @@ from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocess class TemplateRegistry: - def __init__(self, parser: TemplateParser | None = None) -> None: + def __init__( + self, + parser: TemplateParser | None = None, + normalizer: DocTypeNormalizer | None = None, + ) -> None: self._parser = parser or TemplateParser() + self._normalizer = normalizer or DocTypeNormalizer() def load(self, catalog: RulesCatalog, doc_type: str) -> TemplateSpec: + doc_type = self._normalizer.normalize(doc_type) template_name = f"templates/{doc_type}.template.md" template_text = catalog.template_text(doc_type) if not template_text.strip(): diff --git a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/workflow_runtime/buffered_graph.py index 9a0dfae..9774999 100644 --- a/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/doc_update_from_feature_v2/workflow_runtime/buffered_graph.py @@ -17,17 +17,18 @@ class DocUpdateFromFeatureV2WorkflowGraph(WorkflowGraph[TContext]): before = self._snapshot(context) raw_inp = step.trace_input(context) inp = self._merge_trace_payload(raw_inp, before) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, - ) - context = await step.run(context) - after = self._snapshot(context) - raw_out = step.trace_output(context) + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + after = self._snapshot(next_context) + raw_out = step.trace_output(next_context) out = self._merge_trace_payload(raw_out, after) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, + ) trace.log( "workflow_step_traced", { @@ -38,6 +39,7 @@ class DocUpdateFromFeatureV2WorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log("workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}) trace.log("workflow_completed", {"workflow_id": self._workflow_id}) return context diff --git a/src/app/core/agent/processes/v2/workflows/general_qa_summary/workflow_runtime/buffered_graph.py b/src/app/core/agent/processes/v2/workflows/general_qa_summary/workflow_runtime/buffered_graph.py index efe5b81..cc6469c 100644 --- a/src/app/core/agent/processes/v2/workflows/general_qa_summary/workflow_runtime/buffered_graph.py +++ b/src/app/core/agent/processes/v2/workflows/general_qa_summary/workflow_runtime/buffered_graph.py @@ -19,15 +19,16 @@ class GeneralQaSummaryWorkflowGraph(WorkflowGraph[TContext]): steps_buffer: list[dict[str, object]] = [] for step in self._steps: inp = step.trace_input(context) - request_id = context.runtime.request.request_id - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + out = step.trace_output(next_context) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, ) - context = await step.run(context) - out = step.trace_output(context) trace.log( "workflow_step_traced", { @@ -38,6 +39,7 @@ class GeneralQaSummaryWorkflowGraph(WorkflowGraph[TContext]): }, ) steps_buffer.append({"step_id": step.step_id, "title": step.title, "input": inp, "output": out}) + context = next_context trace.log( "workflow_trace_flushed", {"workflow_id": self._workflow_id, "steps": steps_buffer}, diff --git a/src/app/core/agent/runtime/agent_runtime.py b/src/app/core/agent/runtime/agent_runtime.py index 1142632..fc7ed46 100644 --- a/src/app/core/agent/runtime/agent_runtime.py +++ b/src/app/core/agent/runtime/agent_runtime.py @@ -96,11 +96,21 @@ class AgentRuntime: ) async def _publish_result(self, request: AgentRequest) -> None: - try: - await self._publisher.publish_user(request.request_id, "agent", request.answer or "") - except Exception: - LOGGER.exception("failed to publish user event: request_id=%s", request.request_id) await self._safe_publish_status(request.request_id, "runtime", "Обработка запроса завершена.") + try: + await self._publisher.publish_result( + request.request_id, + "agent", + request.answer or "", + { + "result_type": "changeset" if request.changeset else "answer", + "answer": request.answer or "", + "changeset": [item.model_dump(mode="json") for item in request.changeset], + "apply_changeset": request.apply_changeset, + }, + ) + except Exception: + LOGGER.exception("failed to publish result event: request_id=%s", request.request_id) def _complete_request(self, request: AgentRequest, session: AgentSession) -> None: session.append_turn(user_message=request.message, assistant_message=request.answer or "") @@ -122,6 +132,15 @@ class AgentRuntime: "Во время обработки запроса произошла ошибка.", {"code": request.error.code}, ) + try: + await self._publisher.publish_error( + request.request_id, + "runtime", + request.error.desc, + {"error": request.error.model_dump(mode="json")}, + ) + except Exception: + LOGGER.exception("failed to publish error event: request_id=%s", request.request_id) def _build_error_payload(self, exc: Exception) -> ErrorPayload: if isinstance(exc, AppError): diff --git a/src/app/core/agent/runtime/publisher.py b/src/app/core/agent/runtime/publisher.py index 98ca8fa..0093127 100644 --- a/src/app/core/agent/runtime/publisher.py +++ b/src/app/core/agent/runtime/publisher.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging from app.core.api.domain.events.client_event import ClientEventRecord @@ -24,6 +25,7 @@ class RuntimeEventPublisher: sorted(list((payload or {}).keys())), ) await self._publish(request_id, ClientEventType.STATUS, source, text, payload) + await asyncio.sleep(0) async def publish_user(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: LOGGER.warning( @@ -35,6 +37,26 @@ class RuntimeEventPublisher: ) await self._publish(request_id, ClientEventType.USER, source, text, payload) + async def publish_result(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: + LOGGER.warning( + "publish result: request_id=%s source=%s text_len=%s payload_keys=%s", + request_id, + source, + len(text or ""), + sorted(list((payload or {}).keys())), + ) + await self._publish(request_id, ClientEventType.RESULT, source, text, payload) + + async def publish_error(self, request_id: str, source: str, text: str, payload: dict | None = None) -> None: + LOGGER.warning( + "publish error: request_id=%s source=%s text=%s payload_keys=%s", + request_id, + source, + text, + sorted(list((payload or {}).keys())), + ) + await self._publish(request_id, ClientEventType.ERROR, source, text, payload) + async def _publish( self, request_id: str, diff --git a/src/app/core/agent/utils/workflow/graph.py b/src/app/core/agent/utils/workflow/graph.py index e0098ea..75a747a 100644 --- a/src/app/core/agent/utils/workflow/graph.py +++ b/src/app/core/agent/utils/workflow/graph.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Generic, Sequence, TypeVar +from typing import Callable, Generic, Sequence, TypeVar from app.core.agent.utils.workflow.context import WorkflowContext from app.core.agent.utils.workflow.step import WorkflowStep @@ -24,21 +24,65 @@ class WorkflowGraph(Generic[TContext]): return context async def _run_step(self, context: TContext, step: WorkflowStep[TContext]) -> TContext: - request_id = context.runtime.request.request_id trace = context.runtime.trace.module(self._source) trace.log( "step_started", {"workflow_id": self._workflow_id, "step_id": step.step_id, "input": step.trace_input(context)}, ) - await context.runtime.publisher.publish_status( - request_id, - self._source, - f"Шаг workflow: {step.title}.", - {"workflow_id": self._workflow_id, "step_id": step.step_id}, + await self._publish_step_status(context, step, phase="before", input_context=context) + next_context = await step.run(context) + await self._publish_step_status( + next_context, + step, + phase="after", + input_context=context, + output_context=next_context, ) - context = await step.run(context) trace.log( "step_completed", - {"workflow_id": self._workflow_id, "step_id": step.step_id, "output": step.trace_output(context)}, + {"workflow_id": self._workflow_id, "step_id": step.step_id, "output": step.trace_output(next_context)}, ) - return context + return next_context + + async def _publish_step_status( + self, + runtime_context: TContext, + step: WorkflowStep[TContext], + *, + phase: str, + input_context: TContext, + output_context: TContext | None = None, + ) -> None: + message = self._resolve_step_status_message(step, phase, input_context, output_context) + if not message: + return + await runtime_context.runtime.publisher.publish_status( + runtime_context.runtime.request.request_id, + self._source, + message, + {"workflow_id": self._workflow_id, "step_id": step.step_id, "phase": phase}, + ) + + def _resolve_step_status_message( + self, + step: WorkflowStep[TContext], + phase: str, + input_context: TContext, + output_context: TContext | None, + ) -> str | None: + builder: Callable[[], str | None] + active_context = input_context + if phase == "after": + builder = step.get_after_status_message + active_context = output_context or input_context + else: + builder = step.get_before_status_message + tokens = step._push_status_state( + active_context=active_context, + input_context=input_context, + output_context=output_context, + ) + try: + return builder() + finally: + step._pop_status_state(tokens) diff --git a/src/app/core/agent/utils/workflow/step.py b/src/app/core/agent/utils/workflow/step.py index db6fb6d..a8da0a3 100644 --- a/src/app/core/agent/utils/workflow/step.py +++ b/src/app/core/agent/utils/workflow/step.py @@ -1,10 +1,14 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, Generic, TypeVar +from contextvars import ContextVar, Token +from typing import Any, Generic, TypeVar, cast TContext = TypeVar("TContext") +_ACTIVE_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_active_context", default=None) +_INPUT_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_input_context", default=None) +_OUTPUT_CONTEXT: ContextVar[Any | None] = ContextVar("workflow_step_output_context", default=None) class WorkflowStep(ABC, Generic[TContext]): @@ -15,8 +19,45 @@ class WorkflowStep(ABC, Generic[TContext]): async def run(self, context: TContext) -> TContext: raise NotImplementedError + @property + def context(self) -> TContext | None: + return cast(TContext | None, _ACTIVE_CONTEXT.get()) + + @property + def input_context(self) -> TContext | None: + return cast(TContext | None, _INPUT_CONTEXT.get()) + + @property + def output_context(self) -> TContext | None: + return cast(TContext | None, _OUTPUT_CONTEXT.get()) + + def get_before_status_message(self) -> str | None: + return None + + def get_after_status_message(self) -> str | None: + return None + def trace_input(self, context: TContext) -> dict[str, Any]: return {} def trace_output(self, context: TContext) -> dict[str, Any]: return {} + + def _push_status_state( + self, + *, + active_context: TContext, + input_context: TContext, + output_context: TContext | None = None, + ) -> tuple[Token[Any | None], Token[Any | None], Token[Any | None]]: + return ( + _ACTIVE_CONTEXT.set(active_context), + _INPUT_CONTEXT.set(input_context), + _OUTPUT_CONTEXT.set(output_context), + ) + + def _pop_status_state(self, tokens: tuple[Token[Any | None], Token[Any | None], Token[Any | None]]) -> None: + active_token, input_token, output_token = tokens + _ACTIVE_CONTEXT.reset(active_token) + _INPUT_CONTEXT.reset(input_token) + _OUTPUT_CONTEXT.reset(output_token) diff --git a/src/app/core/api/application/request_service.py b/src/app/core/api/application/request_service.py index c4a9fc8..a99ca4e 100644 --- a/src/app/core/api/application/request_service.py +++ b/src/app/core/api/application/request_service.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import logging +from app.core.api.application.request_start_gate import RequestStartGate from app.core.api.domain.models.agent_request import AgentRequest from app.core.api.infrastructure.ids.request_id_factory import RequestIdFactory from app.core.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore @@ -19,11 +20,13 @@ class RequestService: request_ids: RequestIdFactory, sessions: SessionService, runtime: AgentRuntime, + start_gate: RequestStartGate | None = None, ) -> None: self._request_store = request_store self._request_ids = request_ids self._sessions = sessions self._runtime = runtime + self._start_gate = start_gate or RequestStartGate() async def create(self, session_id: str, message: str, process_version: str) -> AgentRequest: session = self._sessions.get(session_id) @@ -41,13 +44,21 @@ class RequestService: process_version, (message or "").replace("\n", "\\n")[:500], ) - task = asyncio.create_task(self._runtime.run(request, session), name=f"agent-runtime:{request.request_id}") + self._start_gate.register(request.request_id) + task = asyncio.create_task(self._run_request(request, session), name=f"agent-runtime:{request.request_id}") task.add_done_callback(self._log_task_result) return request def get(self, request_id: str) -> AgentRequest | None: return self._request_store.get(request_id) + async def _run_request(self, request: AgentRequest, session) -> None: + try: + await self._start_gate.wait_until_ready(request.request_id) + await self._runtime.run(request, session) + finally: + self._start_gate.forget(request.request_id) + def _log_task_result(self, task: asyncio.Task) -> None: try: exc = task.exception() diff --git a/src/app/core/api/application/request_start_gate.py b/src/app/core/api/application/request_start_gate.py new file mode 100644 index 0000000..5806c9f --- /dev/null +++ b/src/app/core/api/application/request_start_gate.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import asyncio +from threading import Lock + + +class RequestStartGate: + def __init__(self, timeout_seconds: float = 0.5) -> None: + self._timeout_seconds = timeout_seconds + self._events: dict[str, asyncio.Event] = {} + self._lock = Lock() + + def register(self, request_id: str) -> None: + with self._lock: + self._events.setdefault(request_id, asyncio.Event()) + + def mark_ready(self, request_id: str) -> None: + with self._lock: + event = self._events.setdefault(request_id, asyncio.Event()) + event.set() + + async def wait_until_ready(self, request_id: str) -> None: + with self._lock: + event = self._events.setdefault(request_id, asyncio.Event()) + try: + await asyncio.wait_for(event.wait(), timeout=self._timeout_seconds) + except asyncio.TimeoutError: + return + + def forget(self, request_id: str) -> None: + with self._lock: + self._events.pop(request_id, None) diff --git a/src/app/core/api/application/stream_service.py b/src/app/core/api/application/stream_service.py index 7a38b1f..8636438 100644 --- a/src/app/core/api/application/stream_service.py +++ b/src/app/core/api/application/stream_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +from app.core.api.application.request_start_gate import RequestStartGate from app.infra.exceptions import AppError from app.core.api.infrastructure.streaming.sse_encoder import SseEncoder from app.core.api.infrastructure.streaming.sse_event_channel import SseEventChannel @@ -7,15 +8,25 @@ from app.schemas.common import ModuleName class StreamService: - def __init__(self, channel: SseEventChannel, request_exists, encoder: SseEncoder | None = None) -> None: + def __init__( + self, + channel: SseEventChannel, + request_exists, + encoder: SseEncoder | None = None, + start_gate: RequestStartGate | None = None, + ) -> None: self._channel = channel self._request_exists = request_exists self._encoder = encoder or SseEncoder() + self._start_gate = start_gate async def subscribe(self, request_id: str): if not self._request_exists(request_id): raise AppError("request_not_found", f"Agent request not found: {request_id}", ModuleName.BACKEND) - return await self._channel.subscribe(request_id, replay=True) + queue = await self._channel.subscribe(request_id, replay=True) + if self._start_gate is not None: + self._start_gate.mark_ready(request_id) + return queue async def unsubscribe(self, request_id: str, queue) -> None: await self._channel.unsubscribe(request_id, queue) diff --git a/src/app/core/api/controllers/rag_public_controller.py b/src/app/core/api/controllers/rag_public_controller.py index a2aa7f1..997e90b 100644 --- a/src/app/core/api/controllers/rag_public_controller.py +++ b/src/app/core/api/controllers/rag_public_controller.py @@ -28,8 +28,11 @@ class RagPublicController: rag_session_id=rag_session_id, index_job_id=job.index_job_id, status=job.status, + total_files=job.total_files, indexed_files=job.indexed_files, failed_files=job.failed_files, + skipped_files=job.skipped_files, + reuse_percent=job.reuse_percent, cache_hit_files=job.cache_hit_files, cache_miss_files=job.cache_miss_files, error=job.error.model_dump(mode="json") if job.error else None, diff --git a/src/app/core/application.py b/src/app/core/application.py index 35952dc..70294c0 100644 --- a/src/app/core/application.py +++ b/src/app/core/application.py @@ -13,6 +13,7 @@ from app.core.agent.utils.llm import AgentLlmService, PromptLoader from app.core.api.module import ApiModule from app.core.api.application.session_bootstrap_service import SessionBootstrapService from app.core.api.application.request_service import RequestService +from app.core.api.application.request_start_gate import RequestStartGate from app.core.api.application.session_service import SessionService from app.core.api.application.stream_service import StreamService from app.core.api.infrastructure.ids.request_id_factory import RequestIdFactory @@ -88,6 +89,7 @@ class ModularApplication: self.agent_requests = InMemoryRequestStore() self.agent_events = SseEventChannel() self.agent_trace_logger = RequestTraceLogger(Path("runtime_traces/agent_requests")) + self.agent_request_start_gate = RequestStartGate() _publisher = RuntimeEventPublisher(self.agent_events, self.agent_trace_logger) _session_service = SessionService( store=self.agent_sessions, @@ -122,11 +124,16 @@ class ModularApplication: request_ids=RequestIdFactory(), sessions=_session_service, runtime=_runtime, + start_gate=self.agent_request_start_gate, ) self.api = ApiModule( session_bootstrap=_session_bootstrap, requests=_request_service, - streams=StreamService(self.agent_events, request_exists=lambda request_id: self.agent_requests.get(request_id) is not None), + streams=StreamService( + self.agent_events, + request_exists=lambda request_id: self.agent_requests.get(request_id) is not None, + start_gate=self.agent_request_start_gate, + ), rag=self.rag, ) diff --git a/src/app/core/rag/indexing/job_store.py b/src/app/core/rag/indexing/job_store.py index 0f99269..865b07c 100644 --- a/src/app/core/rag/indexing/job_store.py +++ b/src/app/core/rag/indexing/job_store.py @@ -18,20 +18,40 @@ class IndexJob: index_job_id: str rag_session_id: str status: IndexJobStatus = IndexJobStatus.QUEUED + total_files: int = 0 indexed_files: int = 0 failed_files: int = 0 + skipped_files: int = 0 cache_hit_files: int = 0 cache_miss_files: int = 0 error: ErrorPayload | None = None + @property + def reuse_percent(self) -> int: + total = self.cache_hit_files + self.cache_miss_files + if total <= 0: + return 0 + return round((self.cache_hit_files / total) * 100) + class IndexJobStore: def __init__(self, repository: RagRepository) -> None: self._repo = repository - def create(self, rag_session_id: str) -> IndexJob: - job = IndexJob(index_job_id=str(uuid4()), rag_session_id=rag_session_id) - self._repo.create_job(job.index_job_id, rag_session_id, job.status.value) + def create(self, rag_session_id: str, *, total_files: int = 0, skipped_files: int = 0) -> IndexJob: + job = IndexJob( + index_job_id=str(uuid4()), + rag_session_id=rag_session_id, + total_files=total_files, + skipped_files=skipped_files, + ) + self._repo.create_job( + job.index_job_id, + rag_session_id, + job.status.value, + total_files=job.total_files, + skipped_files=job.skipped_files, + ) return job def get(self, index_job_id: str) -> IndexJob | None: @@ -55,8 +75,10 @@ class IndexJobStore: index_job_id=row.index_job_id, rag_session_id=row.rag_session_id, status=IndexJobStatus(row.status), + total_files=row.total_files, indexed_files=row.indexed_files, failed_files=row.failed_files, + skipped_files=row.skipped_files, cache_hit_files=row.cache_hit_files, cache_miss_files=row.cache_miss_files, error=payload, @@ -88,8 +110,10 @@ class IndexJobStore: self._repo.update_job( job.index_job_id, status=job.status.value, + total_files=job.total_files, indexed_files=job.indexed_files, failed_files=job.failed_files, + skipped_files=job.skipped_files, cache_hit_files=job.cache_hit_files, cache_miss_files=job.cache_miss_files, error_code=error_code, diff --git a/src/app/core/rag/indexing/orchestrator.py b/src/app/core/rag/indexing/orchestrator.py index 98ea332..740400e 100644 --- a/src/app/core/rag/indexing/orchestrator.py +++ b/src/app/core/rag/indexing/orchestrator.py @@ -35,33 +35,75 @@ class IndexingOrchestrator: self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) async def enqueue_snapshot(self, rag_session_id: str, files: list[dict]) -> IndexJob: - job = self._store.create(rag_session_id) + filtered_files = filter_snapshot_files(files) + total_files = len(files) + skipped_files = max(0, total_files - len(filtered_files)) + job = self._store.create( + rag_session_id, + total_files=total_files, + skipped_files=skipped_files, + ) LOGGER.warning( - "rag index snapshot queued: job_id=%s rag_session_id=%s files=%s", + "rag index snapshot queued: job_id=%s rag_session_id=%s total=%s skipped=%s", job.index_job_id, rag_session_id, - len(files), + total_files, + skipped_files, + ) + asyncio.create_task( + self._process_snapshot( + job.index_job_id, + rag_session_id, + filtered_files, + total_files=total_files, + skipped_files=skipped_files, + ) ) - asyncio.create_task(self._process_snapshot(job.index_job_id, rag_session_id, files)) return job async def enqueue_changes(self, rag_session_id: str, changed_files: list[dict]) -> IndexJob: - job = self._store.create(rag_session_id) + filtered_changes = filter_changes_for_indexing(changed_files) + total_files = len(changed_files) + indexable_total = count_indexable_change_upserts(filtered_changes) + skipped_files = max(0, total_files - indexable_total) + job = self._store.create( + rag_session_id, + total_files=total_files, + skipped_files=skipped_files, + ) LOGGER.warning( - "rag index changes queued: job_id=%s rag_session_id=%s changes=%s", + "rag index changes queued: job_id=%s rag_session_id=%s total=%s skipped=%s", job.index_job_id, rag_session_id, - len(changed_files), + total_files, + skipped_files, + ) + asyncio.create_task( + self._process_changes( + job.index_job_id, + rag_session_id, + filtered_changes, + total_files=total_files, + skipped_files=skipped_files, + ) ) - asyncio.create_task(self._process_changes(job.index_job_id, rag_session_id, changed_files)) return job - async def _process_snapshot(self, job_id: str, rag_session_id: str, files: list[dict]) -> None: - filtered_files = filter_snapshot_files(files) + async def _process_snapshot( + self, + job_id: str, + rag_session_id: str, + filtered_files: list[dict], + *, + total_files: int, + skipped_files: int, + ) -> None: await self._run_with_project_lock( job_id=job_id, rag_session_id=rag_session_id, - total_files=len(filtered_files), + total_files=total_files, + skipped_files=skipped_files, + progress_total=len(filtered_files), operation=lambda progress_cb: self._rag.index_snapshot( rag_session_id=rag_session_id, files=filtered_files, @@ -69,12 +111,21 @@ class IndexingOrchestrator: ), ) - async def _process_changes(self, job_id: str, rag_session_id: str, changed_files: list[dict]) -> None: - filtered_changes = filter_changes_for_indexing(changed_files) + async def _process_changes( + self, + job_id: str, + rag_session_id: str, + filtered_changes: list[dict], + *, + total_files: int, + skipped_files: int, + ) -> None: await self._run_with_project_lock( job_id=job_id, rag_session_id=rag_session_id, - total_files=count_indexable_change_upserts(filtered_changes), + total_files=total_files, + skipped_files=skipped_files, + progress_total=count_indexable_change_upserts(filtered_changes), operation=lambda progress_cb: self._rag.index_changes( rag_session_id=rag_session_id, changed_files=filtered_changes, @@ -82,7 +133,15 @@ class IndexingOrchestrator: ), ) - async def _run_with_project_lock(self, job_id: str, rag_session_id: str, total_files: int, operation) -> None: + async def _run_with_project_lock( + self, + job_id: str, + rag_session_id: str, + total_files: int, + skipped_files: int, + progress_total: int, + operation, + ) -> None: lock = self._locks[rag_session_id] async with lock: job = self._store.get(job_id) @@ -90,17 +149,26 @@ class IndexingOrchestrator: LOGGER.warning("rag index job missing in store before start: job_id=%s", job_id) return job.status = IndexJobStatus.RUNNING + job.total_files = total_files + job.skipped_files = skipped_files self._store.save(job) LOGGER.warning( - "rag index job running: job_id=%s rag_session_id=%s total_files=%s", + "rag index job running: job_id=%s rag_session_id=%s total_files=%s skipped_files=%s", job_id, rag_session_id, total_files, + skipped_files, ) await self._events.publish( job_id, "index_status", - {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + { + "index_job_id": job_id, + "status": job.status.value, + "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, + }, ) try: async def progress_cb(current_file_index: int, total: int, current_file_name: str) -> None: @@ -110,8 +178,11 @@ class IndexingOrchestrator: { "index_job_id": job_id, "current_file_index": current_file_index, - "total_files": total, + "total_files": total_files, + "indexable_files": total, "processed_files": current_file_index, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "current_file_path": current_file_name, "current_file_name": current_file_name, }, @@ -123,8 +194,10 @@ class IndexingOrchestrator: timeout=timeout_sec, ) job.status = IndexJobStatus.DONE + job.total_files = total_files job.indexed_files = indexed job.failed_files = failed + job.skipped_files = skipped_files job.cache_hit_files = cache_hits job.cache_miss_files = cache_misses self._store.save(job) @@ -142,11 +215,13 @@ class IndexingOrchestrator: { "index_job_id": job_id, "status": job.status.value, + "total_files": total_files, "indexed_files": indexed, "failed_files": failed, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "cache_hit_files": cache_hits, "cache_miss_files": cache_misses, - "total_files": total_files, }, ) await self._events.publish( @@ -155,15 +230,19 @@ class IndexingOrchestrator: { "index_job_id": job_id, "status": "done", + "total_files": total_files, "indexed_files": indexed, "failed_files": failed, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "cache_hit_files": cache_hits, "cache_miss_files": cache_misses, - "total_files": total_files, }, ) except (TimeoutError, ConnectionError, OSError) as exc: job.status = IndexJobStatus.ERROR + job.total_files = total_files + job.skipped_files = skipped_files job.failed_files = max(1, job.failed_files) job.error = ErrorPayload( code="index_runtime_error", @@ -175,7 +254,13 @@ class IndexingOrchestrator: await self._events.publish( job_id, "index_status", - {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + { + "index_job_id": job_id, + "status": job.status.value, + "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, + }, ) await self._events.publish( job_id, @@ -184,6 +269,8 @@ class IndexingOrchestrator: "index_job_id": job_id, "status": "error", "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "error": { "code": job.error.code, "desc": job.error.desc, @@ -193,6 +280,8 @@ class IndexingOrchestrator: ) except asyncio.TimeoutError as exc: job.status = IndexJobStatus.ERROR + job.total_files = total_files + job.skipped_files = skipped_files job.failed_files = max(1, job.failed_files) job.error = ErrorPayload( code="index_timeout", @@ -204,7 +293,13 @@ class IndexingOrchestrator: await self._events.publish( job_id, "index_status", - {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + { + "index_job_id": job_id, + "status": job.status.value, + "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, + }, ) await self._events.publish( job_id, @@ -213,6 +308,8 @@ class IndexingOrchestrator: "index_job_id": job_id, "status": "error", "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "error": { "code": job.error.code, "desc": job.error.desc, @@ -222,6 +319,8 @@ class IndexingOrchestrator: ) except Exception as exc: job.status = IndexJobStatus.ERROR + job.total_files = total_files + job.skipped_files = skipped_files job.failed_files = max(1, job.failed_files) job.error = ErrorPayload( code="index_unexpected_error", @@ -233,7 +332,13 @@ class IndexingOrchestrator: await self._events.publish( job_id, "index_status", - {"index_job_id": job_id, "status": job.status.value, "total_files": total_files}, + { + "index_job_id": job_id, + "status": job.status.value, + "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, + }, ) await self._events.publish( job_id, @@ -242,6 +347,8 @@ class IndexingOrchestrator: "index_job_id": job_id, "status": "error", "total_files": total_files, + "skipped_files": skipped_files, + "reuse_percent": job.reuse_percent, "error": { "code": job.error.code, "desc": job.error.desc, diff --git a/src/app/core/rag/persistence/job_repository.py b/src/app/core/rag/persistence/job_repository.py index 6dd59d1..5d2e79c 100644 --- a/src/app/core/rag/persistence/job_repository.py +++ b/src/app/core/rag/persistence/job_repository.py @@ -13,8 +13,10 @@ class RagJobRow: index_job_id: str rag_session_id: str status: str + total_files: int indexed_files: int failed_files: int + skipped_files: int cache_hit_files: int cache_miss_files: int error_code: str | None @@ -24,16 +26,36 @@ class RagJobRow: class RagJobRepository: - def create_job(self, index_job_id: str, rag_session_id: str, status: str) -> None: + def create_job( + self, + index_job_id: str, + rag_session_id: str, + status: str, + *, + total_files: int = 0, + skipped_files: int = 0, + ) -> None: with get_engine().connect() as conn: conn.execute( text( """ - INSERT INTO rag_index_jobs (index_job_id, rag_session_id, status) - VALUES (:jid, :sid, :status) + INSERT INTO rag_index_jobs ( + index_job_id, + rag_session_id, + status, + total_files, + skipped_files + ) + VALUES (:jid, :sid, :status, :total_files, :skipped_files) """ ), - {"jid": index_job_id, "sid": rag_session_id, "status": status}, + { + "jid": index_job_id, + "sid": rag_session_id, + "status": status, + "total_files": total_files, + "skipped_files": skipped_files, + }, ) conn.commit() @@ -42,8 +64,10 @@ class RagJobRepository: index_job_id: str, *, status: str, + total_files: int, indexed_files: int, failed_files: int, + skipped_files: int, cache_hit_files: int = 0, cache_miss_files: int = 0, error_code: str | None = None, @@ -56,8 +80,10 @@ class RagJobRepository: """ UPDATE rag_index_jobs SET status = :status, + total_files = :total_files, indexed_files = :indexed, failed_files = :failed, + skipped_files = :skipped_files, cache_hit_files = :cache_hit_files, cache_miss_files = :cache_miss_files, error_code = :ecode, @@ -70,8 +96,10 @@ class RagJobRepository: { "jid": index_job_id, "status": status, + "total_files": total_files, "indexed": indexed_files, "failed": failed_files, + "skipped_files": skipped_files, "cache_hit_files": cache_hit_files, "cache_miss_files": cache_miss_files, "ecode": error_code, @@ -86,7 +114,8 @@ class RagJobRepository: row = conn.execute( text( """ - SELECT index_job_id, rag_session_id, status, indexed_files, failed_files, + SELECT index_job_id, rag_session_id, status, total_files, indexed_files, failed_files, + skipped_files, cache_hit_files, cache_miss_files, error_code, error_desc, error_module, updated_at FROM rag_index_jobs WHERE index_job_id = :jid diff --git a/src/app/core/rag/persistence/repository.py b/src/app/core/rag/persistence/repository.py index 87d34bb..a13d6f4 100644 --- a/src/app/core/rag/persistence/repository.py +++ b/src/app/core/rag/persistence/repository.py @@ -31,8 +31,8 @@ class RagRepository: def get_session(self, rag_session_id: str) -> dict | None: return self._sessions.get_session(rag_session_id) - def create_job(self, index_job_id: str, rag_session_id: str, status: str) -> None: - self._jobs.create_job(index_job_id, rag_session_id, status) + def create_job(self, index_job_id: str, rag_session_id: str, status: str, **kwargs) -> None: + self._jobs.create_job(index_job_id, rag_session_id, status, **kwargs) def update_job(self, index_job_id: str, **kwargs) -> None: self._jobs.update_job(index_job_id, **kwargs) diff --git a/src/app/core/rag/persistence/schema_migrator.py b/src/app/core/rag/persistence/schema_migrator.py index 87e639e..c18ffe7 100644 --- a/src/app/core/rag/persistence/schema_migrator.py +++ b/src/app/core/rag/persistence/schema_migrator.py @@ -15,6 +15,8 @@ class RagSchemaMigrator: def _ensure_core_columns(self, conn) -> None: for statement in ( + "ALTER TABLE rag_index_jobs ADD COLUMN IF NOT EXISTS total_files INTEGER NOT NULL DEFAULT 0", + "ALTER TABLE rag_index_jobs ADD COLUMN IF NOT EXISTS skipped_files INTEGER NOT NULL DEFAULT 0", "ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS layer VARCHAR(64) NULL", "ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS lang VARCHAR(32) NULL", "ALTER TABLE rag_chunks ADD COLUMN IF NOT EXISTS repo_id VARCHAR(512) NULL", diff --git a/src/app/core/rag/persistence/schema_statements.py b/src/app/core/rag/persistence/schema_statements.py index 38d9ae7..74af9fe 100644 --- a/src/app/core/rag/persistence/schema_statements.py +++ b/src/app/core/rag/persistence/schema_statements.py @@ -16,8 +16,10 @@ def base_table_statements() -> tuple[str, ...]: index_job_id VARCHAR(64) PRIMARY KEY, rag_session_id VARCHAR(64) NOT NULL, status VARCHAR(16) NOT NULL, + total_files INTEGER NOT NULL DEFAULT 0, indexed_files INTEGER NOT NULL DEFAULT 0, failed_files INTEGER NOT NULL DEFAULT 0, + skipped_files INTEGER NOT NULL DEFAULT 0, cache_hit_files INTEGER NOT NULL DEFAULT 0, cache_miss_files INTEGER NOT NULL DEFAULT 0, error_code VARCHAR(128) NULL, diff --git a/src/app/schemas/client_events.py b/src/app/schemas/client_events.py index d78b9da..261500a 100644 --- a/src/app/schemas/client_events.py +++ b/src/app/schemas/client_events.py @@ -10,6 +10,8 @@ class ClientEventType(str, Enum): SYSTEM = "system" STATUS = "status" USER = "user" + RESULT = "result" + ERROR = "error" class ClientEvent(BaseModel): diff --git a/src/app/schemas/indexing.py b/src/app/schemas/indexing.py index cc56187..4c1ac68 100644 --- a/src/app/schemas/indexing.py +++ b/src/app/schemas/indexing.py @@ -49,8 +49,11 @@ class IndexJobStatus(str, Enum): class IndexJobResponse(BaseModel): index_job_id: str status: IndexJobStatus + total_files: int = 0 indexed_files: int = 0 failed_files: int = 0 + skipped_files: int = 0 + reuse_percent: int = 0 cache_hit_files: int = 0 cache_miss_files: int = 0 error: Optional[ErrorPayload] = None diff --git a/src/app/schemas/rag_sessions.py b/src/app/schemas/rag_sessions.py index 62b2aa6..37de101 100644 --- a/src/app/schemas/rag_sessions.py +++ b/src/app/schemas/rag_sessions.py @@ -7,8 +7,11 @@ class RagSessionJobResponse(BaseModel): rag_session_id: str index_job_id: str status: IndexJobStatus + total_files: int = 0 indexed_files: int = 0 failed_files: int = 0 + skipped_files: int = 0 + reuse_percent: int = 0 cache_hit_files: int = 0 cache_miss_files: int = 0 error: dict | None = None diff --git a/tests/unit_tests/agent/test_doc_update_from_feature_v2_happy_path.py b/tests/unit_tests/agent/test_doc_update_from_feature_v2_happy_path.py index 395aceb..2c30c09 100644 --- a/tests/unit_tests/agent/test_doc_update_from_feature_v2_happy_path.py +++ b/tests/unit_tests/agent/test_doc_update_from_feature_v2_happy_path.py @@ -1,7 +1,15 @@ from __future__ import annotations +import asyncio +from pathlib import Path from types import SimpleNamespace +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step2_load_source_content.step import ( + LoadSourceContentStep, +) +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step4_load_rules.step import ( + LoadRulesStep, +) from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step3_parse_requirements.parser import ( FunctionalRequirementsParser, ) @@ -15,9 +23,15 @@ from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.steps.step from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.source_sections import ( RequirementSourceSections, ) +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.rules_catalog import ( + RulesCatalog, +) from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_parser import ( TemplateParser, ) +from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.subprocesses.common.template_registry import ( + TemplateRegistry, +) from app.core.agent.processes.v2.workflows.doc_update_from_feature_v2.workflow_runtime.context import ( DocUpdateFromFeatureV2Context, ) @@ -178,6 +192,70 @@ def test_task_builder_uses_create_when_path_is_new_and_no_delete_markers() -> No assert tasks[0].path == "docs/orders/web/ui_page/orders.ui.list.md" +def test_task_builder_normalizes_data_entity_to_db_table() -> None: + context = DocUpdateFromFeatureV2Context( + runtime=SimpleNamespace(), + route=SimpleNamespace(), + rag_session_id="", + analytics_meta=AnalyticsMeta( + application="orders_pprb", + domain="orders", + subdomain="lifecycle", + ), + requirements=[ + RequirementUnit( + section_key="6.5", + heading="Создание таблицы orders в БД", + body="Описание таблицы orders.", + metadata={ + "id": "orders.db.table.orders", + "doc_type": "data_entity", + "application": "orders_pprb", + "platform": "pprb", + }, + ) + ], + ) + + tasks = RequirementTaskBuilder(_UnexpectedLlmCall()).build(context) + + assert len(tasks) == 1 + assert tasks[0].doc_type == "db_table" + assert tasks[0].path == "docs/orders/pprb/db_table/orders.db.table.orders.md" + + +def test_task_builder_normalizes_domain_entity_to_db_table() -> None: + context = DocUpdateFromFeatureV2Context( + runtime=SimpleNamespace(), + route=SimpleNamespace(), + rag_session_id="", + analytics_meta=AnalyticsMeta( + application="orders_pprb", + domain="orders", + subdomain="lifecycle", + ), + requirements=[ + RequirementUnit( + section_key="6.5", + heading="Создание таблицы orders в БД", + body="Описание таблицы orders.", + metadata={ + "id": "orders.db.table.orders", + "doc_type": "domain_entity", + "application": "orders_pprb", + "platform": "pprb", + }, + ) + ], + ) + + tasks = RequirementTaskBuilder(_UnexpectedLlmCall()).build(context) + + assert len(tasks) == 1 + assert tasks[0].doc_type == "db_table" + assert tasks[0].path == "docs/orders/pprb/db_table/orders.db.table.orders.md" + + def test_delete_heuristic_does_not_match_phrase_ne_udalos() -> None: heuristic = DeleteIntentHeuristic() @@ -185,6 +263,28 @@ def test_delete_heuristic_does_not_match_phrase_ne_udalos() -> None: assert heuristic.is_delete("Нужно удалить существующую страницу документации.") is True +def test_load_source_content_step_uses_repo_root_when_docs_dir_is_in_parent(tmp_path) -> None: + project_root = tmp_path / "test_doc" + features_dir = project_root / "features" + docs_dir = project_root / "docs" + features_dir.mkdir(parents=True) + docs_dir.mkdir(parents=True) + source_path = features_dir / "order_list.md" + source_path.write_text("# Feature", encoding="utf-8") + context = DocUpdateFromFeatureV2Context( + runtime=SimpleNamespace(), + route=SimpleNamespace(), + rag_session_id="", + source_ref=source_path.as_posix(), + source_kind="markdown_file", + ) + + result = asyncio.run(LoadSourceContentStep().run(context)) + + assert result.project_root == project_root.as_posix() + assert result.source_content == "# Feature" + + def test_template_parser_extracts_ordered_sections_from_ui_template() -> None: parser = TemplateParser() template = """ @@ -216,6 +316,43 @@ doc_type: ui_page assert spec.sections[1].has_children is True +def test_template_registry_loads_db_table_template_by_data_entity_alias() -> None: + registry = TemplateRegistry() + catalog = RulesCatalog( + by_name={ + "templates/db_table.template.md": """ +--- +doc_type: db_table +--- + +#