from __future__ import annotations import math from app.modules.agent.runtime import ( AgentRuntimeExecutor, DocsQAPipelineRunner, RuntimeRepoContextFactory, RuntimeRetrievalAdapter, ) from app.modules.agent.llm import AgentLlmService from app.modules.agent.llm.prompt_loader import PromptLoader from app.modules.agent.runtime.steps.context import build_retrieval_request, build_retrieval_result from app.modules.agent.intent_router_v2 import ConversationState, IntentRouterV2 from app.modules.agent.intent_router_v2.factory import GigaChatIntentRouterFactory from app.modules.shared.gigachat.client import GigaChatClient from app.modules.shared.gigachat.settings import GigaChatSettings from app.modules.shared.gigachat.token_provider import GigaChatTokenProvider from tests.pipeline_setup_v3.core.models import ExecutionPayload, V3Case class AgentRuntimeAdapter: def __init__(self, *, pipeline_mode: str = "full", router_llm_mode: str = "deterministic") -> None: self._pipeline_mode = pipeline_mode self._router_llm_mode = router_llm_mode self._router = self._build_router() self._repo_context_factory = RuntimeRepoContextFactory() self._retrieval = RuntimeRetrievalAdapter() self._executor: AgentRuntimeExecutor | None = None def execute(self, case: V3Case, rag_session_id: str | None) -> ExecutionPayload: if self._is_docs_calibration_case(case): return self._execute_docs_case(case, rag_session_id) if case.mode == "router_only": return self._run_router_only(case) if not rag_session_id: raise ValueError(f"Case '{case.case_id}' requires rag_session_id or repo_path") if case.mode == "router_rag": return self._run_router_rag(case, rag_session_id) if case.mode == "full_chain": return self._run_full_chain(case, rag_session_id) raise ValueError(f"Unsupported mode: {case.mode}") def _execute_docs_case(self, case: V3Case, rag_session_id: str | None) -> ExecutionPayload: route = self._route(case.query) if route.intent == "CODE_QA": return self._stubbed_code_payload(case, route) if case.mode == "router_only": actual = self._actual_from_route(route) actual["code_intents_stubbed"] = True details = { "query": case.query, "router_result": route.model_dump(mode="json"), "rag_rows": [], "diagnostics": {"code_intents_stubbed": True}, "pipeline_steps": _router_only_steps(case.query, route), } return ExecutionPayload(actual=actual, details=details) if not rag_session_id: raise ValueError(f"Case '{case.case_id}' requires rag_session_id or repo_path") return self._run_docs_chain(case, rag_session_id) def _run_router_only(self, case: V3Case) -> ExecutionPayload: route = self._route(case.query) actual = self._actual_from_route(route) details = { "query": case.query, "router_result": route.model_dump(mode="json"), "rag_rows": [], "pipeline_steps": _router_only_steps(case.query, route), } return ExecutionPayload(actual=actual, details=details) def _run_router_rag(self, case: V3Case, rag_session_id: str) -> ExecutionPayload: route = self._route(case.query) request = build_retrieval_request(route, rag_session_id) raw_rows = self._retrieve_rows(rag_session_id, request) symbol_resolution = self._resolve_symbol(route.symbol_resolution.model_dump(), raw_rows) retrieval_result = build_retrieval_result(raw_rows, self._retrieval.consume_retrieval_report(), symbol_resolution) actual = self._actual_from_route( route, rag_rows=raw_rows, answer_mode="partial", path_scope=list(request.path_scope or []), layers=list(request.requested_layers or []), symbol_candidates=list(request.symbol_candidates or []), ) details = { "query": case.query, "router_result": route.model_dump(mode="json"), "retrieval_request": request.model_dump(mode="json"), "retrieval_result": retrieval_result.model_dump(mode="json"), "rag_rows": raw_rows, "symbol_resolution": symbol_resolution, "pipeline_steps": _router_rag_steps( query=case.query, route=route.model_dump(mode="json"), retrieval_request=request.model_dump(mode="json"), retrieval_result=retrieval_result.model_dump(mode="json"), rag_rows=raw_rows, symbol_resolution=symbol_resolution, ), } return ExecutionPayload(actual=actual, details=details) def _run_full_chain(self, case: V3Case, rag_session_id: str) -> ExecutionPayload: result = self._executor_instance().execute(user_query=case.query, rag_session_id=rag_session_id) route = result.router_result request = result.retrieval_request actual = self._actual_from_route( route, rag_rows=list(result.retrieval_result.raw_rows) if result.retrieval_result else [], llm_answer=result.final_answer, answer_mode=_answer_status(result.answer_mode, result.llm_used), path_scope=list(request.path_scope or []) if request else [], layers=list(request.requested_layers or []) if request else [], symbol_candidates=list(route.query_plan.symbol_candidates or []) if route else [], ) details = { "query": case.query, "router_result": route.model_dump(mode="json") if route else {}, "retrieval_request": request.model_dump(mode="json") if request else {}, "retrieval_result": result.retrieval_result.model_dump(mode="json") if result.retrieval_result else {}, "diagnostics": result.diagnostics.model_dump(mode="json"), "rag_rows": list(result.retrieval_result.raw_rows) if result.retrieval_result else [], "validation": result.validation.model_dump(mode="json"), "token_usage": _token_usage(result), "steps": list(result.runtime_trace), "pipeline_steps": _full_chain_steps( query=case.query, route=route.model_dump(mode="json") if route else {}, retrieval_request=request.model_dump(mode="json") if request else {}, runtime_trace=list(result.runtime_trace), ), } return ExecutionPayload(actual=actual, details=details) def _route(self, query: str): return self._router.route(query, ConversationState(), self._repo_context_factory.build()) def _build_router(self) -> IntentRouterV2: if self._router_llm_mode == "llm_disambiguation": return GigaChatIntentRouterFactory().build() return IntentRouterV2() def _retrieve_rows(self, rag_session_id: str, request) -> list[dict]: if request.sub_intent == "OPEN_FILE" and request.path_scope: return self._retrieval.retrieve_exact_files( rag_session_id, paths=list(request.path_scope), layers=["C0_SOURCE_CHUNKS"], limit=200, query=request.query, ranking_profile=str(getattr(request.retrieval_spec, "rerank_profile", "") or ""), ) return self._retrieval.retrieve_with_plan( rag_session_id, request.query, request.retrieval_spec, request.retrieval_constraints, query_plan=request.query_plan, ) def _resolve_symbol(self, initial: dict, rag_rows: list[dict]) -> dict: if str(initial.get("status") or "") != "pending": return initial candidates = [str(item).strip() for item in initial.get("alternatives", []) if str(item).strip()] found = [ str(row.get("title") or "").strip() for row in rag_rows if str(row.get("layer") or "") == "C1_SYMBOL_CATALOG" and str(row.get("title") or "").strip() ] exact = next((item for item in found if item in candidates), None) if exact: return {"status": "resolved", "resolved_symbol": exact, "alternatives": found[:5], "confidence": 0.99} if found: return {"status": "ambiguous", "resolved_symbol": None, "alternatives": found[:5], "confidence": 0.55} return {"status": "not_found", "resolved_symbol": None, "alternatives": [], "confidence": 0.0} def _actual_from_route( self, route, *, rag_rows: list[dict] | None = None, llm_answer: str = "", answer_mode: str = "partial", path_scope: list[str] | None = None, layers: list[str] | None = None, symbol_candidates: list[str] | None = None, ) -> dict: route_dump = route.model_dump(mode="json") if route else {} return { "intent": route_dump.get("intent"), "sub_intent": dict(route_dump.get("query_plan") or {}).get("sub_intent"), "graph_id": route_dump.get("graph_id"), "conversation_mode": route_dump.get("conversation_mode"), "rag_count": len(rag_rows or []), "llm_answer": llm_answer, "answer_mode": answer_mode, "path_scope": tuple(path_scope or []), "symbol_candidates": tuple(symbol_candidates or []), "layers": tuple(layers or []), } def _executor_instance(self) -> AgentRuntimeExecutor: if self._executor is None: self._executor = AgentRuntimeExecutor(_build_llm()) return self._executor def _docs_runner_instance(self) -> DocsQAPipelineRunner: return DocsQAPipelineRunner( router=self._router, retrieval_adapter=self._retrieval, repo_context=self._repo_context_factory.build(), llm=None if self._pipeline_mode == "pre_llm_only" else _build_llm(), ) def _run_docs_chain(self, case: V3Case, rag_session_id: str) -> ExecutionPayload: result = self._docs_runner_instance().run(case.query, rag_session_id, mode=self._pipeline_mode) actual = self._actual_from_docs_result(result) details = { "query": case.query, "router_result": result.router_result.model_dump(mode="json"), "retrieval_request": result.retrieval_request.model_dump(mode="json"), "diagnostics": result.diagnostics.model_dump(mode="json"), "llm_request": dict(result.llm_request or {}), "rag_rows": list(result.raw_rows), "pipeline_steps": [ _input_step(case.query), { "step": "router", "input": {"query": case.query}, "output": { "intent": result.router_result.intent, "sub_intent": result.router_result.query_plan.sub_intent, "graph_id": result.router_result.graph_id, "conversation_mode": result.router_result.conversation_mode, }, }, { "step": "docs_pipeline", "input": {"query": case.query}, "output": { "answer_mode": result.answer_mode, "prompt_name": result.prompt_name, "llm_request": { "prompt_name": result.llm_request.get("prompt_name"), "log_context": result.llm_request.get("log_context"), "prompt_stats": dict(result.llm_request.get("prompt_stats") or {}), }, "degraded_reason": result.degraded_reason, }, }, ], } return ExecutionPayload(actual=actual, details=details) def _actual_from_docs_result(self, result) -> dict: route = result.router_result request = result.retrieval_request diagnostics = result.diagnostics entity_candidates = tuple(diagnostics.query_entity_candidates or self._docs_entity_candidates(result.raw_rows)) doc_scope = tuple(diagnostics.doc_ids or self._docs_scope(result.raw_rows)) return { "intent": route.intent, "sub_intent": route.query_plan.sub_intent, "graph_id": route.graph_id, "conversation_mode": route.conversation_mode, "rag_count": len(result.raw_rows), "llm_answer": result.answer, "answer_mode": _docs_answer_status(result.answer_mode, self._pipeline_mode), "path_scope": tuple(getattr(request, "path_scope", []) or []), "doc_scope": doc_scope, "entity_candidates": entity_candidates, "symbol_candidates": (), "layers": tuple(request.requested_layers or []), "filters": { "doc_type": getattr(request.retrieval_spec.filters, "doc_type", None), }, "pipeline_mode": self._pipeline_mode, "gate_decision": diagnostics.gate_decision, "prompt_used": result.prompt_name, "llm_mode": diagnostics.llm_mode, "degraded_reason": result.degraded_reason or None, "code_intents_stubbed": True, } def _stubbed_code_payload(self, case: V3Case, route) -> ExecutionPayload: actual = { "intent": route.intent, "sub_intent": route.query_plan.sub_intent, "graph_id": route.graph_id, "conversation_mode": route.conversation_mode, "rag_count": 0, "llm_answer": "", "answer_mode": "stubbed", "status": "stubbed", "reason": "code intents are temporarily disabled during docs pipeline calibration", "layers": (), "symbol_candidates": (), "code_intents_stubbed": True, } details = { "query": case.query, "router_result": route.model_dump(mode="json"), "rag_rows": [], "diagnostics": { "code_intents_stubbed": True, "degraded_reason": "code_intent_stubbed", }, "pipeline_steps": [ _input_step(case.query), { "step": "router", "input": {"query": case.query}, "output": { "intent": route.intent, "sub_intent": route.query_plan.sub_intent, "graph_id": route.graph_id, "conversation_mode": route.conversation_mode, }, }, { "step": "code_stub", "input": {"query": case.query}, "output": { "status": "stubbed", "reason": "code intents are temporarily disabled during docs pipeline calibration", }, }, ], } return ExecutionPayload(actual=actual, details=details) def _is_docs_calibration_case(self, case: V3Case) -> bool: return "suite_03_docs" in case.source_file.as_posix() def _docs_scope(self, rows: list[dict]) -> list[str]: values: list[str] = [] for row in rows: metadata = dict(row.get("metadata") or {}) for candidate in (metadata.get("document_id"), metadata.get("doc_id")): value = str(candidate or "").strip() if value and value not in values: values.append(value) return values def _docs_entity_candidates(self, rows: list[dict]) -> list[str]: values: list[str] = [] for row in rows: if str(row.get("layer") or "") != "D3_ENTITY_CATALOG": continue title = str(row.get("title") or "").strip() if title and title not in values: values.append(title) return values def _build_llm() -> AgentLlmService: settings = GigaChatSettings.from_env() client = GigaChatClient(settings, GigaChatTokenProvider(settings)) return AgentLlmService(client=client, prompts=PromptLoader()) def _answer_status(answer_mode: str, llm_used: bool) -> str: if answer_mode == "normal" and llm_used: return "answered" return answer_mode def _docs_answer_status(answer_mode: str, pipeline_mode: str) -> str: if pipeline_mode != "pre_llm_only": return answer_mode if answer_mode == "ready": return "answered" if answer_mode == "ready_partial": return "structured_spec" return answer_mode def _token_usage(result) -> dict: draft = result.draft_answer if draft is None: return {} system_prompt = PromptLoader().load(draft.prompt_name) tokens_in_estimate = max(1, int(math.ceil((len(system_prompt or "") + len(draft.prompt_payload or "")) / 4))) return { "prompt_name": draft.prompt_name, "tokens_in_estimate": tokens_in_estimate, } def _router_only_steps(query: str, route) -> list[dict]: route_dump = route.model_dump(mode="json") return [ _input_step(query), { "step": "router", "input": {"query": query}, "output": { "intent": route_dump.get("intent"), "sub_intent": dict(route_dump.get("query_plan") or {}).get("sub_intent"), "graph_id": route_dump.get("graph_id"), "conversation_mode": route_dump.get("conversation_mode"), }, }, ] def _router_rag_steps( *, query: str, route: dict, retrieval_request: dict, retrieval_result: dict, rag_rows: list[dict], symbol_resolution: dict, ) -> list[dict]: return [ _input_step(query), { "step": "router", "input": {"query": query}, "output": { "intent": route.get("intent"), "sub_intent": dict(route.get("query_plan") or {}).get("sub_intent"), "graph_id": route.get("graph_id"), "conversation_mode": route.get("conversation_mode"), }, }, { "step": "retrieval_planning", "input": { "query": query, "router_result": { "intent": route.get("intent"), "sub_intent": dict(route.get("query_plan") or {}).get("sub_intent"), "graph_id": route.get("graph_id"), }, }, "output": retrieval_request, }, { "step": "retrieval", "input": retrieval_request, "output": { "rag_count": len(rag_rows), "retrieval_result": retrieval_result, "symbol_resolution": symbol_resolution, }, }, ] def _full_chain_steps(*, query: str, route: dict, retrieval_request: dict, runtime_trace: list[dict]) -> list[dict]: steps = [_input_step(query)] for raw_step in runtime_trace: step = dict(raw_step) name = str(step.get("step") or "").strip() if name == "router": step.setdefault("input", {"query": query}) if name == "retrieval": steps.append( { "step": "retrieval_planning", "input": { "query": query, "router_result": { "intent": route.get("intent"), "sub_intent": dict(route.get("query_plan") or {}).get("sub_intent"), "graph_id": route.get("graph_id"), }, }, "output": retrieval_request, } ) step.setdefault("input", retrieval_request) steps.append( { "step": name, "status": step.get("status"), "timings_ms": step.get("timings_ms") or {}, "input": step.get("input") or {}, "output": step.get("output") or {}, } ) return steps def _input_step(query: str) -> dict: return { "step": "input_query", "input": {"query": query}, "output": {"query": query}, }