"""Diagnostics for the CODE_QA pipeline: Level 1 summary and Level 2 detail.""" from __future__ import annotations from typing import Any from app.modules.rag.code_qa_pipeline.contracts import ( DiagnosticsReport, EvidenceBundle, RetrievalRequest, RetrievalResult, RouterResult, ) def build_diagnostics_report( *, router_result: RouterResult, retrieval_request: RetrievalRequest | None, retrieval_result: RetrievalResult | None, evidence_bundle: EvidenceBundle | None, answer_mode: str = "normal", timings_ms: dict[str, int] | None = None, resolved_target: str | None = None, answer_policy_branch: str = "", decision_reason: str = "", evidence_gate_input: dict[str, Any] | None = None, post_evidence_gate: dict[str, Any] | None = None, ) -> DiagnosticsReport: """Build full diagnostics: Level 1 summary + Level 2 detail + failure reasons.""" timings = dict(timings_ms or {}) req = retrieval_request res = retrieval_result bundle = evidence_bundle intent_correct = None target_found = bool(bundle and bundle.resolved_target) layers_used = list(req.requested_layers) if req else [] retrieval_sufficient = bool(bundle and bundle.sufficient) if not retrieval_sufficient and bundle: answer_mode = "degraded" if bundle.evidence_count else "insufficient" failure_reasons = list(bundle.failure_reasons) if bundle else [] router_result_dict = _router_result_to_dict(router_result) retrieval_request_dict = _retrieval_request_to_dict(req) if req else {} per_layer = [] if res: for o in res.layer_outcomes: per_layer.append({ "layer_id": o.layer_id, "hit_count": o.hit_count, "empty": o.empty, "fallback_used": o.fallback_used, }) empty_layers = list(res.missing_layers) if res else [] evidence_gate_decision = {} if bundle is not None: evidence_gate_decision = { "sufficient": bundle.sufficient, "failure_reasons": list(bundle.failure_reasons), "evidence_count": bundle.evidence_count, } return DiagnosticsReport( intent_correct=intent_correct, target_found=target_found, layers_used=layers_used, retrieval_sufficient=retrieval_sufficient, answer_mode=answer_mode, resolved_target=resolved_target or (bundle.resolved_target if bundle else None), answer_policy_branch=answer_policy_branch, decision_reason=decision_reason, router_result=router_result_dict, retrieval_request=retrieval_request_dict, per_layer_outcome=per_layer, empty_layers=empty_layers, evidence_gate_decision=evidence_gate_decision, evidence_gate_input=dict(evidence_gate_input or {}), post_evidence_gate=dict(post_evidence_gate or {}), failure_reasons=failure_reasons, timings_ms=timings, ) def build_level1_summary(report: DiagnosticsReport) -> dict[str, Any]: """Human-readable summary: intent, target, layers, sufficiency, answer mode.""" return { "intent_correct": report.intent_correct, "target_found": report.target_found, "layers_used": report.layers_used, "retrieval_sufficient": report.retrieval_sufficient, "answer_mode": report.answer_mode, "resolved_target": report.resolved_target, "answer_policy_branch": report.answer_policy_branch, "decision_reason": report.decision_reason, "failure_reasons": report.failure_reasons, } def build_level2_detail(report: DiagnosticsReport) -> dict[str, Any]: """Detailed diagnostics for tuning and tests.""" return { "router_result": report.router_result, "retrieval_request": report.retrieval_request, "per_layer_outcome": report.per_layer_outcome, "empty_layers": report.empty_layers, "resolved_target": report.resolved_target, "answer_policy_branch": report.answer_policy_branch, "decision_reason": report.decision_reason, "evidence_gate_input": report.evidence_gate_input, "evidence_gate_decision": report.evidence_gate_decision, "post_evidence_gate": report.post_evidence_gate, "failure_reasons": report.failure_reasons, "timings_ms": report.timings_ms, } def _router_result_to_dict(r: RouterResult) -> dict[str, Any]: return { "intent": r.intent, "graph_id": r.graph_id, "conversation_mode": r.conversation_mode, "retrieval_profile": r.retrieval_profile, "sub_intent": r.query_plan.sub_intent if r.query_plan else None, "path_scope": list(getattr(r.retrieval_spec.filters, "path_scope", []) or []), "layers": [str(q.layer_id) for q in (r.retrieval_spec.layer_queries or [])], "symbol_resolution_status": r.symbol_resolution.status if r.symbol_resolution else None, } def _retrieval_request_to_dict(req: RetrievalRequest) -> dict[str, Any]: return { "rag_session_id": req.rag_session_id, "query": req.query, "sub_intent": req.sub_intent, "path_scope": list(req.path_scope), "requested_layers": list(req.requested_layers), }