Files
agent/tests/unit_tests/agent/test_v2_process.py
T

287 lines
10 KiB
Python

from __future__ import annotations
import asyncio
from dataclasses import dataclass
from app.core.agent.processes.v2 import V2IntentRouter, V2Process
from app.core.agent.processes.v2.retrieval.target_doc_seeding import normalize_doc_path
from app.core.agent.processes.v2.evidence.assembler import DocsEvidenceAssembler
from app.core.agent.processes.v2.evidence.gate import DocsEvidenceGate
from app.core.agent.processes.v2.retrieval.policy_resolver import V2RetrievalPolicyResolver
from app.core.agent.runtime.execution_context import RuntimeExecutionContext
from app.core.api.domain.models.agent_request import AgentRequest
from app.core.api.domain.models.agent_session import AgentSession
from app.schemas.orchestration import RequestExecutionStatus
class FakePublisher:
async def publish_status(self, *_args, **_kwargs) -> None:
return None
async def publish_user(self, *_args, **_kwargs) -> None:
return None
class FakeTrace:
def __init__(self) -> None:
self.events: list[tuple[str, str, dict | None]] = []
def module(self, _name: str) -> "FakeTrace":
return self
def log(self, title, payload=None, **_kwargs) -> None:
self.events.append(("module", str(title), payload))
class FakeLlm:
def __init__(self, answer: str) -> None:
self.answer = answer
self.calls: list[tuple[str, str]] = []
def generate(self, prompt_name: str, user_input: str, **_kwargs) -> str:
self.calls.append((prompt_name, user_input))
return self.answer
@dataclass(slots=True)
class FakeRagAdapter:
"""Имитирует сырые строки из RagSessionRetriever для summary / find_files."""
summary_rows: list[dict]
file_rows: list[dict]
async def fetch_rows(self, _rag_session_id: str, _query_text: str, plan) -> list[dict]:
if "find_files" in str(plan.profile) or str(plan.profile) == "file_lookup":
return list(self.file_rows)
return list(self.summary_rows)
async def fetch_exact_paths(self, _rag_session_id: str, *, paths: list[str], layers: list[str] | None = None) -> list[dict]:
pool = [*self.summary_rows, *self.file_rows]
want = {normalize_doc_path(p) for p in paths}
return [row for row in pool if normalize_doc_path(str(row.get("path") or "")) in want]
async def fetch_chunks_by_path_substrings(
self,
_rag_session_id: str,
*,
path_needles: list[str],
layers: list[str] | None = None,
limit: int = 200,
) -> list[dict]:
del layers, limit
pool = [*self.summary_rows, *self.file_rows]
return [row for row in pool if any(needle in str(row.get("path") or "") for needle in path_needles)]
_SUMMARY_ROWS = [
{
"path": "docs/api/health.md",
"title": "Health endpoint",
"content": "",
"layer": "D1_DOCUMENT_CATALOG",
"metadata": {
"summary_text": "Endpoint /health возвращает агрегированный статус runtime.",
"document_id": "api.health",
"title": "Health endpoint",
},
}
]
_FILE_ROWS = [
{
"path": "docs/domains/runtime-health.md",
"title": "Runtime health",
"layer": "D3_ENTITY_CATALOG",
"content": "x",
"metadata": {
"entity_name": "RuntimeHealth",
"document_id": "domain.runtime_health",
},
}
]
def _v2_process(llm: FakeLlm, adapter: FakeRagAdapter, *, workflow_llm_enabled: bool = True) -> V2Process:
return V2Process(
llm=llm,
policy_resolver=V2RetrievalPolicyResolver(),
rag_adapter=adapter,
evidence_assembler=DocsEvidenceAssembler(),
evidence_gate=DocsEvidenceGate(),
router=V2IntentRouter(),
workflow_llm_enabled=workflow_llm_enabled,
)
def _context(message: str, *, rag_session_id: str | None = "rag-1") -> RuntimeExecutionContext:
request = AgentRequest(
request_id="req-1",
session_id="sess-1",
message=message,
process_version="v2",
status=RequestExecutionStatus.RUNNING,
created_at=AgentRequest.create("req-x", "sess-x", "x", "v2").created_at,
)
session = AgentSession.create("sess-1", rag_session_id)
return RuntimeExecutionContext(
request=request,
session=session,
publisher=FakePublisher(),
trace=FakeTrace(),
)
def test_v2_process_runs_summary_flow() -> None:
llm = FakeLlm("Краткое объяснение по документации.")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter)
result = asyncio.run(process.run(_context("Объясни /health в документации")))
assert result.answer == "Краткое объяснение по документации."
assert llm.calls
assert "docs/api/health.md" in llm.calls[0][1]
def test_v2_process_runs_find_files_flow_without_llm() -> None:
llm = FakeLlm("should not be used")
adapter = FakeRagAdapter(summary_rows=[], file_rows=_FILE_ROWS)
process = _v2_process(llm, adapter)
result = asyncio.run(process.run(_context("В каком файле описан RuntimeHealth?")))
assert "docs/domains/runtime-health.md" in result.answer
assert llm.calls == []
def test_v2_process_find_files_uses_deterministic_gate_mode() -> None:
llm = FakeLlm("unused")
adapter = FakeRagAdapter(summary_rows=[], file_rows=_FILE_ROWS)
process = _v2_process(llm, adapter)
runtime = _context("В каком документе описан runtime health?")
asyncio.run(process.run(runtime))
pipeline_events = [payload for _, title, payload in runtime.trace.events if title == "evidence_gate_checked"]
assert pipeline_events
assert pipeline_events[0]["answer_mode"] == "deterministic"
def test_v2_process_runs_grounded_general_summary_with_rag() -> None:
llm = FakeLlm("Grounded summary.")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter)
result = asyncio.run(process.run(_context("Что это за сервис?")))
assert result.answer == "Grounded summary."
assert llm.calls
assert "Опорные документы" in llm.calls[0][1]
def test_v2_process_returns_insufficiency_for_general_without_rag() -> None:
llm = FakeLlm("Общий ответ без обращения к документации.")
adapter = FakeRagAdapter(summary_rows=[], file_rows=[])
process = _v2_process(llm, adapter)
result = asyncio.run(process.run(_context("Что это за сервис?", rag_session_id=None)))
assert "grounded summary" in result.answer
assert llm.calls == []
def test_v2_process_requires_active_rag_session() -> None:
process = _v2_process(FakeLlm("unused"), FakeRagAdapter([], []))
result = asyncio.run(process.run(_context("Объясни /health в документации", rag_session_id=None)))
assert "нужна активная RAG-сессия" in result.answer
def test_v2_router_detects_find_files_subintent() -> None:
result = V2IntentRouter().route("В каком файле описан RuntimeHealth?")
assert result.subintent == "FIND_FILES"
assert "RuntimeHealth" in result.anchors.entity_names
assert "runtimehealth" in result.target_terms
def test_v2_process_logs_retrieved_rag_rows_in_trace() -> None:
llm = FakeLlm("Краткое объяснение по документации.")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter)
runtime = _context("Объясни /health в документации")
asyncio.run(process.run(runtime))
retrieval_events = [payload for _, title, payload in runtime.trace.events if title == "rag_rows_fetched"]
assert retrieval_events
payload = retrieval_events[0] or {}
rows = payload.get("rows") or []
assert rows
assert rows[0]["path"] == "docs/api/health.md"
assert rows[0]["layer"] == "D1_DOCUMENT_CATALOG"
assert rows[0]["document_id"] == "api.health"
def test_v2_process_logs_pipeline_steps() -> None:
llm = FakeLlm("Краткое объяснение по документации.")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter)
runtime = _context("Что делает endpoint /health?")
asyncio.run(process.run(runtime))
pipeline_titles = [title for _, title, _ in runtime.trace.events]
assert "router_resolved" in pipeline_titles
assert "anchors_extracted" in pipeline_titles
assert "retrieval_profile_selected" in pipeline_titles
assert "retrieval_executed" in pipeline_titles
assert "evidence_assembled" in pipeline_titles
assert "evidence_gate_checked" in pipeline_titles
assert "answer_generated" in pipeline_titles
def test_v2_process_blocks_generic_docs_answer_without_target_doc() -> None:
llm = FakeLlm("галлюцинация")
adapter = FakeRagAdapter(
summary_rows=[
{
"path": "docs/README.md",
"title": "README",
"content": "",
"layer": "D1_DOCUMENT_CATALOG",
"metadata": {"summary_text": "Общий индекс документации.", "document_id": "docs.readme"},
}
],
file_rows=[],
)
process = _v2_process(llm, adapter)
result = asyncio.run(process.run(_context("Что делает endpoint /send?")))
assert "не найден целевой документ" in result.answer
assert llm.calls == []
def test_v2_process_can_disable_workflow_llm_for_docs_summary() -> None:
llm = FakeLlm("should not be used")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter, workflow_llm_enabled=False)
result = asyncio.run(process.run(_context("Объясни /health в документации")))
assert "Endpoint /health возвращает агрегированный статус runtime." in result.answer
assert llm.calls == []
def test_v2_process_can_disable_workflow_llm_for_general_summary() -> None:
llm = FakeLlm("should not be used")
adapter = FakeRagAdapter(summary_rows=_SUMMARY_ROWS, file_rows=[])
process = _v2_process(llm, adapter, workflow_llm_enabled=False)
result = asyncio.run(process.run(_context("Что это за сервис?")))
assert "агрегированный статус runtime" in result.answer
assert llm.calls == []