Роутер работает нормально в process v2

This commit is contained in:
2026-04-07 14:09:51 +03:00
parent 0a25e42ea1
commit 8b7b72967e
1746 changed files with 216414 additions and 14037 deletions
@@ -0,0 +1,420 @@
from __future__ import annotations
import asyncio
from dataclasses import asdict, dataclass
from pathlib import Path
from app.core.agent.processes.v2.anchor_signals import route_anchor_summary
from app.core.agent.processes.v2 import V2IntentRouter
from app.core.agent.processes.v2.evidence.assembler import DocsEvidenceAssembler
from app.core.agent.processes.v2.evidence.gate import DocsEvidenceGate
from app.core.agent.processes.v2.models import RetrievedFile, RetrievedSummary, V2Intent, V2Subintent
from app.core.agent.processes.v2.retrieval import DocsMetadataLookupIndex
from app.core.agent.processes.v2.retrieval.policy_resolver import V2RetrievalPolicyResolver
from app.core.agent.processes.v2.retrieval.v2_rag_adapter import V2RagRetrievalAdapter
from app.core.agent.processes.v2.workflows.docs_explain_find_files.context import DocsExplainFindFilesContext
from app.core.agent.processes.v2.workflows.docs_explain_find_files.graph import DocsExplainFindFilesGraph
from app.core.agent.processes.v2.workflows.docs_explain_summary.context import DocsExplainSummaryContext
from app.core.agent.processes.v2.workflows.docs_explain_summary.graph import DocsExplainSummaryGraph
from app.core.agent.processes.v2.workflows.general_summary.context import GeneralSummaryContext
from app.core.agent.processes.v2.workflows.general_summary.graph import GeneralSummaryGraph
from app.core.agent.utils.llm import AgentLlmService, PromptLoader
from app.core.rag.embedding.gigachat_embedder import GigaChatEmbedder
from app.core.rag.persistence import RagRepository
from app.core.rag.retrieval.session_retriever import RagSessionRetriever
from app.core.shared.gigachat.client import GigaChatClient
from app.core.shared.gigachat.settings import GigaChatSettings
from app.core.shared.gigachat.token_provider import GigaChatTokenProvider
from app.infra.observability.module_trace import RequestTraceContext
from tests.pipeline_setup_v3.core.models import ExecutionPayload, V3Case
class V2ProcessAdapter:
def __init__(self, *, workflow_llm_enabled: bool = True) -> None:
self._workflow_llm_enabled = workflow_llm_enabled
self._router = V2IntentRouter(llm=_build_v2_llm())
self._policy = V2RetrievalPolicyResolver()
retriever = RagSessionRetriever(repository=RagRepository(), embedder=GigaChatEmbedder(_build_client()))
self._retrieval = V2RagRetrievalAdapter(retriever)
self._evidence = DocsEvidenceAssembler()
self._gate = DocsEvidenceGate()
self._summary_graph = DocsExplainSummaryGraph(_build_v2_llm())
self._find_files_graph = DocsExplainFindFilesGraph()
self._general_graph = GeneralSummaryGraph(_build_v2_llm())
def execute(self, case: V3Case, rag_session_id: str | None) -> ExecutionPayload:
return asyncio.run(self._execute_async(case, rag_session_id))
async def _execute_async(self, case: V3Case, rag_session_id: str | None) -> ExecutionPayload:
runtime = _RuntimeStub(query=case.query)
route = self._router.route(case.query)
_log_pipeline_step(
runtime,
"router_resolved",
{
"domain": route.routing_domain,
"intent": route.intent,
"subintent": route.subintent,
"confidence": route.confidence,
},
)
_log_pipeline_step(
runtime,
"anchors_extracted",
{
"signal_types": route_anchor_summary(route)["signal_types"],
"endpoint_paths": route.anchors.endpoint_paths,
"target_doc_hints": route.anchors.target_doc_hints,
"matched_aliases": route.anchors.matched_aliases,
},
)
_log_pipeline_step(
runtime,
"alias_resolution",
{
"resolved_aliases": route.anchors.matched_aliases,
"target_doc_hints": route.anchors.target_doc_hints,
},
)
if case.mode == "router_only":
return ExecutionPayload(
actual=_actual_from_v2(route),
details=_details(case.query, route=route, pipeline_steps=_build_pipeline_steps(runtime.logs)),
)
plan = self._policy.resolve(route)
_log_pipeline_step(
runtime,
"retrieval_profile_selected",
{"profile": plan.profile, "layers": plan.layers, "filters": plan.filters},
)
semantic_rows = await self._retrieve_rows(route, rag_session_id, plan)
seeded_rows = await self._seed_candidates_from_target_hints(route, rag_session_id, plan)
metadata_rows = self._metadata_lookup_candidates([*seeded_rows, *semantic_rows], route)
rows = self._merge_candidate_rows(seeded_rows, metadata_rows, semantic_rows)
_log_pipeline_step(
runtime,
"candidate_generation",
{
"resolved_aliases": route.anchors.matched_aliases,
"target_doc_hints": route.anchors.target_doc_hints,
"candidate_docs_before_ranking": [self._trace_row(row) for row in rows[:8]],
"sources": {
"seeded": [self._trace_row(row) for row in seeded_rows[:5]],
"metadata_lookup": [self._trace_row(row) for row in metadata_rows[:5]],
"semantic": [self._trace_row(row) for row in semantic_rows[:5]],
},
},
)
_log_pipeline_step(
runtime,
"retrieval_executed",
{
"query": case.query,
"profile": plan.profile,
"row_count": len(rows),
"target_doc_hints": route.anchors.target_doc_hints,
"top_results": [self._trace_row(row) for row in rows[:5]],
},
)
if case.mode == "router_rag":
return ExecutionPayload(
actual=_actual_from_v2(route, rows=rows, plan=plan, answer_mode="partial"),
details=_details(case.query, route=route, plan=plan, rows=rows, pipeline_steps=_build_pipeline_steps(runtime.logs)),
)
answer, evidence, gate = await self._run_workflow(runtime, route, rag_session_id, rows)
answer_mode = gate.answer_mode
_log_pipeline_step(
runtime,
"answer_generated",
{"answer_mode": answer_mode, "answer_length": len(answer)},
)
return ExecutionPayload(
actual=_actual_from_v2(route, rows=rows, plan=plan, answer=answer, answer_mode=answer_mode),
details=_details(
case.query,
route=route,
plan=plan,
rows=rows,
evidence=evidence,
answer=answer,
logs=runtime.logs,
pipeline_steps=_build_pipeline_steps(runtime.logs),
),
)
async def _retrieve_rows(self, route, rag_session_id: str | None, plan) -> list[dict]:
if not rag_session_id:
if route.intent == V2Intent.GENERAL_QA:
return []
raise ValueError("process_v2 cases with DOCS intent require rag_session_id")
return await self._retrieval.fetch_rows(rag_session_id, route.normalized_query, plan)
async def _seed_candidates_from_target_hints(self, route, rag_session_id: str | None, plan) -> list[dict]:
if not rag_session_id or not route.anchors.target_doc_hints:
return []
return await self._retrieval.fetch_exact_paths(rag_session_id, paths=route.anchors.target_doc_hints, layers=plan.layers)
def _metadata_lookup_candidates(self, rows: list[dict], route) -> list[dict]:
return DocsMetadataLookupIndex(rows).lookup(route)
def _merge_candidate_rows(self, *groups: list[dict]) -> list[dict]:
merged: list[dict] = []
seen: set[tuple[str, str, str]] = set()
for rows in groups:
for row in rows:
key = (
str(row.get("path") or ""),
str(row.get("layer") or ""),
str(dict(row.get("metadata") or {}).get("section_path") or ""),
)
if key in seen:
continue
seen.add(key)
merged.append(row)
return merged
async def _run_workflow(
self,
runtime: "_RuntimeStub",
route,
rag_session_id: str | None,
rows: list[dict],
) -> tuple[str, dict, object]:
if route.intent == V2Intent.GENERAL_QA:
documents = self._evidence.assemble_summaries(rows, route)
gate = self._gate.check_summaries(route, documents)
_log_pipeline_step(
runtime,
"evidence_assembled",
{"mode": "summary", "primary_doc": documents[0].path if documents else None, "document_count": len(documents)},
)
self._log_ranking(runtime, documents)
_log_pipeline_step(
runtime,
"evidence_gate_checked",
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
)
context = GeneralSummaryContext(runtime=runtime, route=route, prompt_name="v2_general.summary_answer")
context.workflow_llm_enabled = self._workflow_llm_enabled
context.documents = documents
context.gate_decision = gate
final = await self._general_graph.run(context)
return final.answer, {"documents": [_serialize_summary(item) for item in documents], "files": []}, gate
if route.subintent == V2Subintent.FIND_FILES:
files = self._evidence.assemble_files(rows, route)
gate = self._gate.check_files(route, files)
_log_pipeline_step(
runtime,
"evidence_assembled",
{"mode": "find_files", "primary_file": files[0].path if files else None, "file_count": len(files)},
)
self._log_ranking(runtime, files)
_log_pipeline_step(
runtime,
"evidence_gate_checked",
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
)
context = DocsExplainFindFilesContext(
runtime=runtime,
route=route,
rag_session_id=rag_session_id or "",
files=files,
gate_decision=gate,
)
final = await self._find_files_graph.run(context)
return final.answer, {"documents": [], "files": [_serialize_file(item) for item in files]}, gate
documents = self._evidence.assemble_summaries(rows, route)
gate = self._gate.check_summaries(route, documents)
_log_pipeline_step(
runtime,
"evidence_assembled",
{"mode": "summary", "primary_doc": documents[0].path if documents else None, "document_count": len(documents)},
)
self._log_ranking(runtime, documents)
_log_pipeline_step(
runtime,
"evidence_gate_checked",
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
)
context = DocsExplainSummaryContext(
runtime=runtime,
route=route,
rag_session_id=rag_session_id or "",
prompt_name="v2_docs_explain.summary_answer",
workflow_llm_enabled=self._workflow_llm_enabled,
documents=documents,
gate_decision=gate,
)
final = await self._summary_graph.run(context)
return final.answer, {"documents": [_serialize_summary(item) for item in documents], "files": []}, gate
def _trace_row(self, row: dict) -> dict[str, object]:
metadata = dict(row.get("metadata") or {})
return {
"path": str(row.get("path") or ""),
"layer": str(row.get("layer") or ""),
"title": str(row.get("title") or ""),
"document_id": str(metadata.get("document_id") or metadata.get("doc_id") or ""),
}
def _log_ranking(self, runtime: "_RuntimeStub", items: list) -> None:
top_docs: list[dict[str, object]] = []
for item in items[:4]:
top_docs.append(
{
"doc": getattr(item, "path", ""),
"score": getattr(item, "score", 0),
"match_reason": getattr(item, "match_reason", ""),
}
)
_log_pipeline_step(
runtime,
"ranking_explained",
{
"doc": getattr(item, "path", ""),
"score": getattr(item, "score", 0),
"score_breakdown": getattr(item, "score_breakdown", {}),
"match_reason": getattr(item, "match_reason", ""),
},
)
_log_pipeline_step(
runtime,
"ranking_explained",
{
"top_docs_after_ranking": top_docs,
"ranking_score_breakdown": [
{
"doc": getattr(item, "path", ""),
"score_breakdown": getattr(item, "score_breakdown", {}),
}
for item in items[:4]
],
},
)
@dataclass(slots=True)
class _RequestStub:
request_id: str
message: str
@dataclass(slots=True)
class _SessionStub:
active_rag_session_id: str | None = None
class _PublisherStub:
async def publish_status(self, request_id: str, source: str, message: str, payload: dict | None = None) -> None:
return None
class _TraceLoggerStub:
def __init__(self, store: list[dict]) -> None:
self._store = store
def log_module(self, request_id: str, module: str, title: str, payload: dict | None = None) -> None:
self._store.append(
{"request_id": request_id, "module": module, "event": title, "payload": dict(payload or {})}
)
class _RuntimeStub:
def __init__(self, *, query: str) -> None:
self.logs: list[dict] = []
self.request = _RequestStub(request_id="pipeline_setup_v3", message=query)
self.session = _SessionStub()
self.publisher = _PublisherStub()
self.trace = RequestTraceContext(request_id=self.request.request_id, logger=_TraceLoggerStub(self.logs))
def _build_client() -> GigaChatClient:
settings = GigaChatSettings.from_env()
return GigaChatClient(settings, GigaChatTokenProvider(settings))
def _build_v2_llm() -> AgentLlmService:
prompt_paths = [
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/prompts.yml",
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/general_prompts.yml",
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/intent_router/routers/prompts.yml",
]
return AgentLlmService(client=_build_client(), prompts=PromptLoader(prompt_paths))
def _actual_from_v2(route, *, rows: list[dict] | None = None, plan=None, answer: str = "", answer_mode: str = "partial") -> dict:
return {
"domain": route.routing_domain,
"intent": route.intent,
"sub_intent": route.subintent,
"rag_count": len(rows or []),
"llm_answer": answer,
"answer_mode": answer_mode,
"path_scope": tuple(),
"symbol_candidates": tuple(),
"entity_candidates": tuple(_entity_candidates(rows or [])),
"doc_scope": tuple(_doc_scope(rows or [])),
"layers": tuple(getattr(plan, "layers", []) or []),
"filters": dict(getattr(plan, "filters", {}) or {}),
}
def _details(query: str, **payload) -> dict:
details = {"query": query}
for key, value in payload.items():
if key == "route":
details["router_result"] = asdict(value)
elif key == "plan":
details["retrieval_plan"] = asdict(value)
else:
details[key] = value
return details
def _doc_scope(rows: list[dict]) -> list[str]:
values: list[str] = []
for row in rows:
metadata = dict(row.get("metadata") or {})
for candidate in (metadata.get("document_id"), metadata.get("doc_id"), row.get("path")):
value = str(candidate or "").strip()
if value and value not in values:
values.append(value)
return values
def _entity_candidates(rows: list[dict]) -> list[str]:
values: list[str] = []
for row in rows:
metadata = dict(row.get("metadata") or {})
value = str(metadata.get("entity_name") or row.get("title") or "").strip()
if value and value not in values and str(row.get("layer") or "") == "D3_ENTITY_CATALOG":
values.append(value)
return values
def _serialize_summary(item: RetrievedSummary) -> dict:
return asdict(item)
def _serialize_file(item: RetrievedFile) -> dict:
return asdict(item)
def _build_pipeline_steps(logs: list[dict]) -> list[dict]:
steps: list[dict] = []
for item in logs:
if item.get("module") != "process.v2.pipeline":
continue
steps.append({"step": item.get("event"), "output": item.get("payload") or {}})
return steps
def _log_pipeline_step(runtime: _RuntimeStub, step: str, payload: dict[str, object]) -> None:
runtime.logs.append(
{
"request_id": runtime.request.request_id,
"module": "process.v2.pipeline",
"event": step,
"payload": payload,
}
)