Files

77 lines
2.9 KiB
Python

from __future__ import annotations
from datetime import datetime
from pathlib import Path
from tests.pipeline_setup_v3.core.artifacts import ArtifactWriter
from tests.pipeline_setup_v3.core.case_loader import CaseDirectoryLoader
from tests.pipeline_setup_v3.core.models import V3CaseResult
from tests.pipeline_setup_v3.core.session_provider import RagSessionProvider
from tests.pipeline_setup_v3.core.validators import CaseValidator
class V3Runner:
def __init__(
self,
cases_dir: Path,
results_dir: Path,
run_name: str,
*,
pipeline_mode: str = "full",
router_llm_mode: str = "deterministic",
workflow_llm_enabled: bool = True,
) -> None:
self._cases_dir = cases_dir
self._pipeline_mode = pipeline_mode
self._router_llm_mode = router_llm_mode
self._workflow_llm_enabled = workflow_llm_enabled
self._validator = CaseValidator()
self._sessions = RagSessionProvider()
self._agent_runtime = None
self._writer = ArtifactWriter(results_dir, run_name=run_name, started_at=datetime.now())
@property
def run_dir(self) -> Path:
return self._writer.run_dir
def run(self) -> tuple[list[V3CaseResult], Path]:
results: list[V3CaseResult] = []
for case in CaseDirectoryLoader().load(self._cases_dir):
rag_session_id = self._sessions.resolve(case.input)
payload = self._execute(case, rag_session_id)
mismatches = self._validator.validate(case, payload.actual, payload.details)
result = V3CaseResult(
case=case,
actual=payload.actual,
details=payload.details,
passed=not mismatches,
mismatches=mismatches,
)
self._writer.write_case(result)
results.append(result)
return results, self._writer.write_summary(results)
def _execute(self, case, rag_session_id):
if case.runner == "agent_runtime":
return self._agent_runtime_adapter().execute(case, rag_session_id)
if case.runner == "process_v2":
return self._v2_process_adapter().execute(case, rag_session_id)
raise ValueError(f"Unsupported runner: {case.runner}")
def _agent_runtime_adapter(self):
if self._agent_runtime is None:
from tests.pipeline_setup_v3.runtime.agent_runtime_adapter import AgentRuntimeAdapter
self._agent_runtime = AgentRuntimeAdapter(
pipeline_mode=self._pipeline_mode,
router_llm_mode=self._router_llm_mode,
)
return self._agent_runtime
def _v2_process_adapter(self):
if not hasattr(self, "_v2_process"):
from tests.pipeline_setup_v3.runtime.v2_process_adapter import V2ProcessAdapter
self._v2_process = V2ProcessAdapter(workflow_llm_enabled=self._workflow_llm_enabled)
return self._v2_process