ввв
This commit is contained in:
@@ -4,43 +4,42 @@ import asyncio
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from app.core.agent.processes.v2.anchor_signals import route_anchor_summary
|
||||
from app.core.agent.processes.v2 import V2IntentRouter
|
||||
from app.core.agent.processes.v2.evidence.assembler import DocsEvidenceAssembler
|
||||
from app.core.agent.processes.v2.evidence.gate import DocsEvidenceGate
|
||||
from app.core.agent.processes.v2.models import RetrievedFile, RetrievedSummary, V2Intent, V2Subintent
|
||||
from app.core.agent.processes.v2.retrieval import DocsMetadataLookupIndex
|
||||
from app.core.agent.processes.v2.retrieval.policy_resolver import V2RetrievalPolicyResolver
|
||||
from app.core.agent.processes.v2.retrieval.v2_rag_adapter import V2RagRetrievalAdapter
|
||||
from app.core.agent.processes.v2.workflows.docs_explain_find_files.context import DocsExplainFindFilesContext
|
||||
from app.core.agent.processes.v2.workflows.docs_explain_find_files.graph import DocsExplainFindFilesGraph
|
||||
from app.core.agent.processes.v2.workflows.docs_explain_summary.context import DocsExplainSummaryContext
|
||||
from app.core.agent.processes.v2.workflows.docs_explain_summary.graph import DocsExplainSummaryGraph
|
||||
from app.core.agent.processes.v2.workflows.general_summary.context import GeneralSummaryContext
|
||||
from app.core.agent.processes.v2.workflows.general_summary.graph import GeneralSummaryGraph
|
||||
from app.core.agent.processes.v2 import V2IntentRouter, V2Process
|
||||
from app.core.agent.utils.llm import AgentLlmService, PromptLoader
|
||||
from app.core.rag.embedding.gigachat_embedder import GigaChatEmbedder
|
||||
from app.core.rag.persistence import RagRepository
|
||||
from app.core.rag.retrieval.session_retriever import RagSessionRetriever
|
||||
from app.core.shared.gigachat.client import GigaChatClient
|
||||
from app.core.shared.gigachat.settings import GigaChatSettings
|
||||
from app.core.shared.gigachat.token_provider import GigaChatTokenProvider
|
||||
from app.infra.observability.module_trace import RequestTraceContext
|
||||
from app.core.agent.utils.process_v2.anchor_signals import route_anchor_summary
|
||||
from app.core.agent.utils.process_v2.evidence.assembler import DocsEvidenceAssembler
|
||||
from app.core.agent.utils.process_v2.evidence.gate import DocsEvidenceGate
|
||||
from app.core.agent.utils.process_v2.models import V2Intent
|
||||
from app.core.agent.utils.process_v2.plan_resolver import V2RetrievalPolicyResolver
|
||||
from app.core.agent.utils.process_v2.rag_retrieval import DocsMetadataLookupIndex, V2RagRetrievalAdapter
|
||||
from tests.pipeline_setup_v3.core.models import ExecutionPayload, V3Case
|
||||
from tests.pipeline_setup_v3.shared.rag_indexer import DeterministicEmbedder
|
||||
from tests.pipeline_setup_v4.executors.process_v2_router_executor import _KeywordLlm
|
||||
|
||||
|
||||
class V2ProcessAdapter:
|
||||
def __init__(self, *, workflow_llm_enabled: bool = True) -> None:
|
||||
self._workflow_llm_enabled = workflow_llm_enabled
|
||||
self._router = V2IntentRouter(llm=_build_v2_llm())
|
||||
self._llm = _build_v2_llm()
|
||||
self._router = V2IntentRouter(llm=_KeywordLlm(), enable_llm_disambiguation=True)
|
||||
self._policy = V2RetrievalPolicyResolver()
|
||||
retriever = RagSessionRetriever(repository=RagRepository(), embedder=GigaChatEmbedder(_build_client()))
|
||||
retriever = RagSessionRetriever(repository=RagRepository(), embedder=DeterministicEmbedder())
|
||||
self._retrieval = V2RagRetrievalAdapter(retriever)
|
||||
self._evidence = DocsEvidenceAssembler()
|
||||
self._gate = DocsEvidenceGate()
|
||||
self._summary_graph = DocsExplainSummaryGraph(_build_v2_llm())
|
||||
self._find_files_graph = DocsExplainFindFilesGraph()
|
||||
self._general_graph = GeneralSummaryGraph(_build_v2_llm())
|
||||
self._process = V2Process(
|
||||
llm=self._llm,
|
||||
policy_resolver=self._policy,
|
||||
rag_adapter=self._retrieval,
|
||||
evidence_assembler=DocsEvidenceAssembler(),
|
||||
evidence_gate=DocsEvidenceGate(),
|
||||
router=self._router,
|
||||
workflow_llm_enabled=workflow_llm_enabled,
|
||||
)
|
||||
|
||||
def execute(self, case: V3Case, rag_session_id: str | None) -> ExecutionPayload:
|
||||
return asyncio.run(self._execute_async(case, rag_session_id))
|
||||
@@ -81,6 +80,8 @@ class V2ProcessAdapter:
|
||||
actual=_actual_from_v2(route),
|
||||
details=_details(case.query, route=route, pipeline_steps=_build_pipeline_steps(runtime.logs)),
|
||||
)
|
||||
if case.mode == "full_chain":
|
||||
return await self._execute_full_chain(case, rag_session_id, route)
|
||||
plan = self._policy.resolve(route)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
@@ -121,26 +122,7 @@ class V2ProcessAdapter:
|
||||
actual=_actual_from_v2(route, rows=rows, plan=plan, answer_mode="partial"),
|
||||
details=_details(case.query, route=route, plan=plan, rows=rows, pipeline_steps=_build_pipeline_steps(runtime.logs)),
|
||||
)
|
||||
answer, evidence, gate = await self._run_workflow(runtime, route, rag_session_id, rows)
|
||||
answer_mode = gate.answer_mode
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"answer_generated",
|
||||
{"answer_mode": answer_mode, "answer_length": len(answer)},
|
||||
)
|
||||
return ExecutionPayload(
|
||||
actual=_actual_from_v2(route, rows=rows, plan=plan, answer=answer, answer_mode=answer_mode),
|
||||
details=_details(
|
||||
case.query,
|
||||
route=route,
|
||||
plan=plan,
|
||||
rows=rows,
|
||||
evidence=evidence,
|
||||
answer=answer,
|
||||
logs=runtime.logs,
|
||||
pipeline_steps=_build_pipeline_steps(runtime.logs),
|
||||
),
|
||||
)
|
||||
raise ValueError(f"Unsupported process_v2 adapter mode: {case.mode}")
|
||||
|
||||
async def _retrieve_rows(self, route, rag_session_id: str | None, plan) -> list[dict]:
|
||||
if not rag_session_id:
|
||||
@@ -173,125 +155,54 @@ class V2ProcessAdapter:
|
||||
merged.append(row)
|
||||
return merged
|
||||
|
||||
async def _run_workflow(
|
||||
self,
|
||||
runtime: "_RuntimeStub",
|
||||
route,
|
||||
rag_session_id: str | None,
|
||||
rows: list[dict],
|
||||
) -> tuple[str, dict, object]:
|
||||
if route.intent == V2Intent.GENERAL_QA:
|
||||
documents = self._evidence.assemble_summaries(rows, route)
|
||||
gate = self._gate.check_summaries(route, documents)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_assembled",
|
||||
{"mode": "summary", "primary_doc": documents[0].path if documents else None, "document_count": len(documents)},
|
||||
)
|
||||
self._log_ranking(runtime, documents)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_gate_checked",
|
||||
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
|
||||
)
|
||||
context = GeneralSummaryContext(runtime=runtime, route=route, prompt_name="v2_general.summary_answer")
|
||||
context.workflow_llm_enabled = self._workflow_llm_enabled
|
||||
context.documents = documents
|
||||
context.gate_decision = gate
|
||||
final = await self._general_graph.run(context)
|
||||
return final.answer, {"documents": [_serialize_summary(item) for item in documents], "files": []}, gate
|
||||
if route.subintent == V2Subintent.FIND_FILES:
|
||||
files = self._evidence.assemble_files(rows, route)
|
||||
gate = self._gate.check_files(route, files)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_assembled",
|
||||
{"mode": "find_files", "primary_file": files[0].path if files else None, "file_count": len(files)},
|
||||
)
|
||||
self._log_ranking(runtime, files)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_gate_checked",
|
||||
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
|
||||
)
|
||||
context = DocsExplainFindFilesContext(
|
||||
runtime=runtime,
|
||||
route=route,
|
||||
rag_session_id=rag_session_id or "",
|
||||
files=files,
|
||||
gate_decision=gate,
|
||||
)
|
||||
final = await self._find_files_graph.run(context)
|
||||
return final.answer, {"documents": [], "files": [_serialize_file(item) for item in files]}, gate
|
||||
documents = self._evidence.assemble_summaries(rows, route)
|
||||
gate = self._gate.check_summaries(route, documents)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_assembled",
|
||||
{"mode": "summary", "primary_doc": documents[0].path if documents else None, "document_count": len(documents)},
|
||||
)
|
||||
self._log_ranking(runtime, documents)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"evidence_gate_checked",
|
||||
{"passed": gate.passed, "reason": gate.reason, "answer_mode": gate.answer_mode},
|
||||
)
|
||||
context = DocsExplainSummaryContext(
|
||||
runtime=runtime,
|
||||
route=route,
|
||||
rag_session_id=rag_session_id or "",
|
||||
prompt_name="v2_docs_explain.summary_answer",
|
||||
workflow_llm_enabled=self._workflow_llm_enabled,
|
||||
documents=documents,
|
||||
gate_decision=gate,
|
||||
)
|
||||
final = await self._summary_graph.run(context)
|
||||
return final.answer, {"documents": [_serialize_summary(item) for item in documents], "files": []}, gate
|
||||
|
||||
def _trace_row(self, row: dict) -> dict[str, object]:
|
||||
metadata = dict(row.get("metadata") or {})
|
||||
return {
|
||||
"path": str(row.get("path") or ""),
|
||||
"layer": str(row.get("layer") or ""),
|
||||
"title": str(row.get("title") or ""),
|
||||
"document_id": str(metadata.get("document_id") or metadata.get("doc_id") or ""),
|
||||
}
|
||||
|
||||
def _log_ranking(self, runtime: "_RuntimeStub", items: list) -> None:
|
||||
top_docs: list[dict[str, object]] = []
|
||||
for item in items[:4]:
|
||||
top_docs.append(
|
||||
{
|
||||
"doc": getattr(item, "path", ""),
|
||||
"score": getattr(item, "score", 0),
|
||||
"match_reason": getattr(item, "match_reason", ""),
|
||||
}
|
||||
)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"ranking_explained",
|
||||
{
|
||||
"doc": getattr(item, "path", ""),
|
||||
"score": getattr(item, "score", 0),
|
||||
"score_breakdown": getattr(item, "score_breakdown", {}),
|
||||
"match_reason": getattr(item, "match_reason", ""),
|
||||
},
|
||||
)
|
||||
_log_pipeline_step(
|
||||
runtime,
|
||||
"ranking_explained",
|
||||
{
|
||||
"top_docs_after_ranking": top_docs,
|
||||
"ranking_score_breakdown": [
|
||||
{
|
||||
"doc": getattr(item, "path", ""),
|
||||
"score_breakdown": getattr(item, "score_breakdown", {}),
|
||||
}
|
||||
for item in items[:4]
|
||||
],
|
||||
async def _execute_full_chain(self, case: V3Case, rag_session_id: str | None, route) -> ExecutionPayload:
|
||||
runtime = _RuntimeStub(query=case.query, rag_session_id=rag_session_id)
|
||||
result = await self._process.run(runtime)
|
||||
retrieval_plan = _event_payload(runtime.logs, "process.v2.retrieval_policy", "retrieval_plan_resolved")
|
||||
rows = list(_event_payload(runtime.logs, "process.v2.rag_retrieval", "rag_rows_fetched").get("rows") or [])
|
||||
answer_generated = _event_payload(runtime.logs, "process.v2.pipeline", "answer_generated")
|
||||
return ExecutionPayload(
|
||||
actual={
|
||||
"domain": route.routing_domain,
|
||||
"intent": route.intent,
|
||||
"sub_intent": route.subintent,
|
||||
"rag_count": len(rows),
|
||||
"llm_answer": result.answer,
|
||||
"answer_mode": str(answer_generated.get("answer_mode") or ""),
|
||||
"path_scope": tuple(),
|
||||
"symbol_candidates": tuple(),
|
||||
"entity_candidates": tuple(_entity_candidates(rows)),
|
||||
"doc_scope": tuple(_doc_scope(rows)),
|
||||
"layers": tuple(retrieval_plan.get("layers") or []),
|
||||
"filters": dict(retrieval_plan.get("filters") or {}),
|
||||
},
|
||||
details={
|
||||
"query": case.query,
|
||||
"router_result": asdict(route),
|
||||
"retrieval_plan": retrieval_plan,
|
||||
"rows": rows,
|
||||
"answer": result.answer,
|
||||
"logs": runtime.logs,
|
||||
"pipeline_steps": _build_pipeline_steps(runtime.logs),
|
||||
},
|
||||
)
|
||||
|
||||
def _trace_row(self, row: dict) -> dict[str, object]:
|
||||
metadata = row.get("metadata") or {}
|
||||
content = str(row.get("content") or "").strip()
|
||||
return {
|
||||
"layer": str(row.get("layer") or ""),
|
||||
"path": str(row.get("path") or ""),
|
||||
"title": str(row.get("title") or ""),
|
||||
"document_id": str(metadata.get("document_id") or metadata.get("doc_id") or row.get("document_id") or ""),
|
||||
"entity_name": str(metadata.get("entity_name") or ""),
|
||||
"summary_text": str(metadata.get("summary_text") or "")[:400],
|
||||
"section_path": str(metadata.get("section_path") or ""),
|
||||
"metadata_domain": str(metadata.get("domain") or ""),
|
||||
"metadata_subdomain": str(metadata.get("subdomain") or ""),
|
||||
"content_preview": content[:400],
|
||||
}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _RequestStub:
|
||||
@@ -320,10 +231,10 @@ class _TraceLoggerStub:
|
||||
|
||||
|
||||
class _RuntimeStub:
|
||||
def __init__(self, *, query: str) -> None:
|
||||
def __init__(self, *, query: str, rag_session_id: str | None = None) -> None:
|
||||
self.logs: list[dict] = []
|
||||
self.request = _RequestStub(request_id="pipeline_setup_v3", message=query)
|
||||
self.session = _SessionStub()
|
||||
self.session = _SessionStub(active_rag_session_id=rag_session_id)
|
||||
self.publisher = _PublisherStub()
|
||||
self.trace = RequestTraceContext(request_id=self.request.request_id, logger=_TraceLoggerStub(self.logs))
|
||||
|
||||
@@ -335,8 +246,10 @@ def _build_client() -> GigaChatClient:
|
||||
|
||||
def _build_v2_llm() -> AgentLlmService:
|
||||
prompt_paths = [
|
||||
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/prompts.yml",
|
||||
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/general_prompts.yml",
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "src/app/core/agent/processes/v2/workflows/doc_explain_summary/steps/prompts/prompts.yml",
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "src/app/core/agent/processes/v2/workflows/general_qa_summary/steps/prompts/prompts.yml",
|
||||
Path(__file__).resolve().parents[3] / "src/app/core/agent/processes/v2/intent_router/routers/prompts.yml",
|
||||
]
|
||||
return AgentLlmService(client=_build_client(), prompts=PromptLoader(prompt_paths))
|
||||
@@ -375,7 +288,12 @@ def _doc_scope(rows: list[dict]) -> list[str]:
|
||||
values: list[str] = []
|
||||
for row in rows:
|
||||
metadata = dict(row.get("metadata") or {})
|
||||
for candidate in (metadata.get("document_id"), metadata.get("doc_id"), row.get("path")):
|
||||
for candidate in (
|
||||
row.get("document_id"),
|
||||
metadata.get("document_id"),
|
||||
metadata.get("doc_id"),
|
||||
row.get("path"),
|
||||
):
|
||||
value = str(candidate or "").strip()
|
||||
if value and value not in values:
|
||||
values.append(value)
|
||||
@@ -386,20 +304,12 @@ def _entity_candidates(rows: list[dict]) -> list[str]:
|
||||
values: list[str] = []
|
||||
for row in rows:
|
||||
metadata = dict(row.get("metadata") or {})
|
||||
value = str(metadata.get("entity_name") or row.get("title") or "").strip()
|
||||
value = str(row.get("entity_name") or metadata.get("entity_name") or row.get("title") or "").strip()
|
||||
if value and value not in values and str(row.get("layer") or "") == "D3_ENTITY_CATALOG":
|
||||
values.append(value)
|
||||
return values
|
||||
|
||||
|
||||
def _serialize_summary(item: RetrievedSummary) -> dict:
|
||||
return asdict(item)
|
||||
|
||||
|
||||
def _serialize_file(item: RetrievedFile) -> dict:
|
||||
return asdict(item)
|
||||
|
||||
|
||||
def _build_pipeline_steps(logs: list[dict]) -> list[dict]:
|
||||
steps: list[dict] = []
|
||||
for item in logs:
|
||||
@@ -409,6 +319,16 @@ def _build_pipeline_steps(logs: list[dict]) -> list[dict]:
|
||||
return steps
|
||||
|
||||
|
||||
def _event_payload(logs: list[dict], module: str, event: str) -> dict[str, object]:
|
||||
for item in logs:
|
||||
if item.get("module") == module and item.get("event") == event:
|
||||
payload = item.get("payload") or {}
|
||||
if isinstance(payload, dict):
|
||||
return dict(payload)
|
||||
return {}
|
||||
return {}
|
||||
|
||||
|
||||
def _log_pipeline_step(runtime: _RuntimeStub, step: str, payload: dict[str, object]) -> None:
|
||||
runtime.logs.append(
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user