from __future__ import annotations import json from datetime import datetime from pathlib import Path from tests.pipeline_setup_v2.core.models import V2CaseResult class ArtifactWriter: def __init__(self, root: Path, run_name: str, started_at: datetime) -> None: stamp = started_at.strftime("%Y%m%d_%H%M%S") self.run_dir = root / run_name / stamp self.run_dir.mkdir(parents=True, exist_ok=True) def write_case(self, result: V2CaseResult) -> None: stem = f"{result.case.source_file.stem}_{result.case.case_id}" (self.run_dir / f"{stem}.json").write_text( json.dumps( { "case_id": result.case.case_id, "source_file": result.case.source_file.as_posix(), "runner": result.case.runner, "mode": result.case.mode, "actual": result.actual, "passed": result.passed, "mismatches": result.mismatches, "details": result.details, }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) lines = [ f"# {result.case.case_id}", "", f"- source_file: {result.case.source_file.as_posix()}", f"- runner: {result.case.runner}", f"- mode: {result.case.mode}", f"- passed: {result.passed}", "", "## Query", result.case.query, "", "## Actual", json.dumps(result.actual, ensure_ascii=False, indent=2), "", "## Mismatches", *([f"- {item}" for item in result.mismatches] or ["- none"]), ] (self.run_dir / f"{stem}.md").write_text("\n".join(lines), encoding="utf-8") def write_summary(self, results: list[V2CaseResult]) -> Path: path = self.run_dir / "summary.md" path.write_text(SummaryComposer().compose(results), encoding="utf-8") return path class SummaryComposer: def compose(self, results: list[V2CaseResult]) -> str: passed = sum(1 for item in results if item.passed) lines = [ "# pipeline_setup_v2 summary", "", f"Passed: {passed}/{len(results)}", "", "| File | Case | Query | Expected sub-intent | Intent | Actual sub-intent | RAG layers | Tokens | Pass |", "|------|------|-------|---------------------|--------|-------------------|------------|--------|------|", ] lines.extend(self._result_rows(results)) lines.extend(self._failure_section(results)) lines.extend(self._llm_section(results)) return "\n".join(lines) def _result_rows(self, results: list[V2CaseResult]) -> list[str]: rows: list[str] = [] for item in results: actual = item.actual rows.append( f"| {item.case.source_file.name} | {item.case.case_id} | {self._table_text(item.case.query)} | " f"{item.case.expectations.router.sub_intent or '—'} | {actual.get('intent') or '—'} | " f"{actual.get('sub_intent') or '—'} | {self._rag_layers_text(item)} | {self._token_text(item)} | {'✓' if item.passed else '✗'} |" ) return rows def _failure_section(self, results: list[V2CaseResult]) -> list[str]: failures = [item for item in results if not item.passed] if not failures: return [] lines = ["", "## Failures"] for item in failures: lines.append(f"- **{item.case.case_id}**: {'; '.join(item.mismatches)}") return lines def _llm_section(self, results: list[V2CaseResult]) -> list[str]: llm_results = [item for item in results if str(item.actual.get("llm_answer") or "").strip()] if not llm_results: return [] lines = ["", "## LLM Answers"] for item in llm_results: lines.append(f"- **{item.case.case_id}**") lines.append(f" Query: {self._table_text(item.case.query, limit=400)}") lines.extend(self._quote_block(self._snippet(str(item.actual.get("llm_answer") or "")))) return lines def _snippet(self, text: str, limit: int = 880) -> str: compact = " ".join(text.split()) if len(compact) <= limit: return compact return compact[: limit - 1].rstrip() + "…" def _table_text(self, text: str, limit: int = 140) -> str: compact = " ".join(text.split()).replace("|", "\\|") if len(compact) <= limit: return compact return compact[: limit - 1].rstrip() + "…" def _quote_block(self, text: str) -> list[str]: quoted = text.strip() if not quoted: return [" > —"] return [f" > {self._escape_markdown(line)}" for line in quoted.splitlines()] def _escape_markdown(self, text: str) -> str: escaped = text for char in ("\\", "`", "*", "_", "{", "}", "[", "]", "(", ")", "#", "+", "-", "!", "|"): escaped = escaped.replace(char, f"\\{char}") return escaped def _rag_layers_text(self, item: V2CaseResult) -> str: rows = list(item.details.get("rag_rows") or []) if not rows: return "—" counts: dict[str, int] = {} for row in rows: layer = str(row.get("layer") or "").strip() if not layer: continue counts[layer] = counts.get(layer, 0) + 1 if not counts: return "—" parts = [f"{layer}:{counts[layer]}" for layer in sorted(counts)] return self._table_text(", ".join(parts), limit=120) def _token_text(self, item: V2CaseResult) -> str: diagnostics = dict(item.details.get("diagnostics") or {}) prompt = dict(diagnostics.get("prompt") or {}) stats = dict(prompt.get("prompt_stats") or {}) value = stats.get("tokens_in_estimate") return str(value) if value is not None else "—"