145 lines
5.8 KiB
Python
145 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from tests.pipeline_setup_v3.core.models import V3CaseResult
|
|
|
|
|
|
class ArtifactWriter:
|
|
def __init__(self, root: Path, run_name: str, started_at: datetime) -> None:
|
|
stamp = started_at.strftime("%Y%m%d_%H%M%S")
|
|
self.run_dir = root / run_name / stamp
|
|
self.run_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def write_case(self, result: V3CaseResult) -> None:
|
|
stem = f"{result.case.source_file.stem}_{result.case.case_id}"
|
|
payload = {
|
|
"case_id": result.case.case_id,
|
|
"source_file": result.case.source_file.as_posix(),
|
|
"runner": result.case.runner,
|
|
"mode": result.case.mode,
|
|
"query": result.case.query,
|
|
"actual": result.actual,
|
|
"passed": result.passed,
|
|
"mismatches": result.mismatches,
|
|
"details": result.details,
|
|
}
|
|
(self.run_dir / f"{stem}.json").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
lines = [
|
|
f"# {result.case.case_id}",
|
|
"",
|
|
f"- source_file: {result.case.source_file.as_posix()}",
|
|
f"- runner: {result.case.runner}",
|
|
f"- mode: {result.case.mode}",
|
|
f"- passed: {result.passed}",
|
|
"",
|
|
"## Query",
|
|
result.case.query,
|
|
"",
|
|
"## Actual",
|
|
json.dumps(result.actual, ensure_ascii=False, indent=2),
|
|
"",
|
|
"## Steps",
|
|
json.dumps(result.details.get("steps") or [], ensure_ascii=False, indent=2),
|
|
"",
|
|
"## Diagnostics",
|
|
json.dumps(result.details.get("diagnostics") or {}, ensure_ascii=False, indent=2),
|
|
"",
|
|
"## Mismatches",
|
|
*([f"- {item}" for item in result.mismatches] or ["- none"]),
|
|
]
|
|
(self.run_dir / f"{stem}.md").write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
def write_summary(self, results: list[V3CaseResult]) -> Path:
|
|
path = self.run_dir / "summary.md"
|
|
path.write_text(SummaryComposer().compose(results), encoding="utf-8")
|
|
return path
|
|
|
|
|
|
class SummaryComposer:
|
|
def compose(self, results: list[V3CaseResult]) -> str:
|
|
passed = sum(1 for item in results if item.passed)
|
|
lines = [
|
|
"# pipeline_setup_v3 summary",
|
|
"",
|
|
f"Passed: {passed}/{len(results)}",
|
|
"",
|
|
"| File | Case | Mode | Query | Actual sub-intent | RAG layers | Tokens | Pass |",
|
|
"|------|------|------|-------|-------------------|------------|--------|------|",
|
|
]
|
|
lines.extend(self._rows(results))
|
|
failures = [item for item in results if not item.passed]
|
|
if failures:
|
|
lines.extend(["", "## Failures"])
|
|
for item in failures:
|
|
lines.append(f"- **{item.case.case_id}**: {'; '.join(item.mismatches)}")
|
|
lines.extend(self._llm_section(results))
|
|
return "\n".join(lines)
|
|
|
|
def _rows(self, results: list[V3CaseResult]) -> list[str]:
|
|
rows: list[str] = []
|
|
for item in results:
|
|
rows.append(
|
|
f"| {item.case.source_file.name} | {item.case.case_id} | {item.case.mode} | "
|
|
f"{self._cell(item.case.query)} | {item.actual.get('sub_intent') or '—'} | "
|
|
f"{self._layer_text(item.details)} | {self._token_text(item.details)} | {'✓' if item.passed else '✗'} |"
|
|
)
|
|
return rows
|
|
|
|
def _layer_text(self, details: dict) -> str:
|
|
counts: dict[str, int] = {}
|
|
for row in details.get("rag_rows") or []:
|
|
layer = str(row.get("layer") or "").strip()
|
|
if layer:
|
|
counts[layer] = counts.get(layer, 0) + 1
|
|
if not counts:
|
|
return "—"
|
|
return self._cell(", ".join(f"{key}:{value}" for key, value in sorted(counts.items())), limit=120)
|
|
|
|
def _cell(self, text: str, limit: int = 140) -> str:
|
|
compact = " ".join(str(text).split()).replace("|", "\\|")
|
|
if len(compact) <= limit:
|
|
return compact
|
|
return compact[: limit - 1].rstrip() + "…"
|
|
|
|
def _token_text(self, details: dict) -> str:
|
|
token_usage = dict(details.get("token_usage") or {})
|
|
direct = token_usage.get("tokens_in_estimate")
|
|
if direct is not None:
|
|
return str(direct)
|
|
prompt = dict(details.get("diagnostics", {}).get("prompt") or {})
|
|
stats = dict(prompt.get("prompt_stats") or {})
|
|
value = stats.get("tokens_in_estimate")
|
|
return str(value) if value is not None else "—"
|
|
|
|
def _llm_section(self, results: list[V3CaseResult]) -> list[str]:
|
|
llm_results = [item for item in results if str(item.actual.get("llm_answer") or "").strip()]
|
|
if not llm_results:
|
|
return []
|
|
lines = ["", "## LLM Answers"]
|
|
for item in llm_results:
|
|
lines.append(f"- **{item.case.case_id}**")
|
|
lines.append(f" Query: {self._cell(item.case.query, limit=400)}")
|
|
lines.extend(self._quote_block(self._snippet(str(item.actual.get("llm_answer") or ""))))
|
|
return lines
|
|
|
|
def _snippet(self, text: str, limit: int = 880) -> str:
|
|
compact = " ".join(text.split())
|
|
if len(compact) <= limit:
|
|
return compact
|
|
return compact[: limit - 1].rstrip() + "…"
|
|
|
|
def _quote_block(self, text: str) -> list[str]:
|
|
quoted = text.strip()
|
|
if not quoted:
|
|
return [" > —"]
|
|
return [f" > {self._escape_markdown(line)}" for line in quoted.splitlines()]
|
|
|
|
def _escape_markdown(self, text: str) -> str:
|
|
escaped = text
|
|
for char in ("\\", "`", "*", "_", "{", "}", "[", "]", "(", ")", "#", "+", "-", "!", "|"):
|
|
escaped = escaped.replace(char, f"\\{char}")
|
|
return escaped
|