from __future__ import annotations import json from datetime import datetime from pathlib import Path from tests.pipeline_setup_v3.core.models import V3CaseResult class ArtifactWriter: def __init__(self, root: Path, run_name: str, started_at: datetime) -> None: stamp = started_at.strftime("%Y%m%d_%H%M%S") self.run_dir = root / run_name / stamp self.run_dir.mkdir(parents=True, exist_ok=True) def write_case(self, result: V3CaseResult) -> None: stem = f"{result.case.source_file.stem}_{result.case.case_id}" payload = self._json_payload(result) (self.run_dir / f"{stem}.json").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") diagnostics = self._diagnostics_without_prompt(result.details) lines = [ f"# {result.case.case_id}", "", f"- source_file: {result.case.source_file.as_posix()}", f"- runner: {result.case.runner}", f"- mode: {result.case.mode}", f"- passed: {result.passed}", "", "## Query", result.case.query, "", "## Actual", json.dumps(result.actual, ensure_ascii=False, indent=2), "", "## Pipeline Steps", *self._md_steps(result), "", "## Diagnostics", json.dumps(diagnostics, ensure_ascii=False, indent=2), "", *self._llm_request_section(result.details), "", "## Mismatches", *([f"- {item}" for item in result.mismatches] or ["- none"]), ] (self.run_dir / f"{stem}.md").write_text("\n".join(lines), encoding="utf-8") def _json_payload(self, result: V3CaseResult) -> dict: return { "meta": { "case_id": result.case.case_id, "source_file": result.case.source_file.as_posix(), "runner": result.case.runner, "mode": result.case.mode, "passed": result.passed, "mismatches": result.mismatches, "actual": result.actual, }, "pipeline_steps": list(result.details.get("pipeline_steps") or []), } def _md_steps(self, result: V3CaseResult) -> list[str]: steps = list(result.details.get("pipeline_steps") or []) if not steps: return ["- none"] lines: list[str] = [] for item in steps: step = str(item.get("step") or "").strip() or "unknown" status = str(item.get("status") or "").strip() lines.append(f"### {step}") if status: lines.append(f"- status: {status}") timings = item.get("timings_ms") or {} if timings: lines.append(f"- timings_ms: {json.dumps(timings, ensure_ascii=False)}") lines.append("```json") lines.append(json.dumps({"input": item.get("input") or {}, "output": item.get("output") or {}}, ensure_ascii=False, indent=2)) lines.append("```") lines.append("") return lines[:-1] if lines and not lines[-1] else lines def _diagnostics_without_prompt(self, details: dict) -> dict: diagnostics = dict(details.get("diagnostics") or {}) diagnostics.pop("prompt", None) return diagnostics def _llm_request_section(self, details: dict) -> list[str]: llm_request = dict(details.get("llm_request") or {}) if not llm_request: llm_request = dict((details.get("diagnostics") or {}).get("prompt") or {}) if not llm_request: return [] lines = [ "## LLM Request", f"- prompt_name: {llm_request.get('prompt_name') or '—'}", f"- log_context: {llm_request.get('log_context') or '—'}", ] prompt_stats = dict(llm_request.get("prompt_stats") or {}) if prompt_stats: lines.extend( [ "", "### Prompt Stats", "```json", json.dumps(prompt_stats, ensure_ascii=False, indent=2), "```", ] ) system_prompt = str(llm_request.get("system_prompt") or "").strip() user_prompt = str(llm_request.get("user_prompt") or "").strip() lines.extend( [ "", "### System Prompt", "```text", system_prompt or "—", "```", "", "### User Prompt", *self._render_user_prompt(user_prompt), ] ) return lines def _render_user_prompt(self, user_prompt: str) -> list[str]: payload = self._parse_json(user_prompt) if payload is None: return ["```text", user_prompt or "—", "```"] lines = ["```json", json.dumps(payload, ensure_ascii=False, indent=2), "```"] lines.extend(self._prompt_overview(payload)) return lines def _prompt_overview(self, payload: dict) -> list[str]: lines = ["", "### User Prompt Overview"] for key in ("question", "intent", "sub_intent"): value = payload.get(key) if value is not None: lines.append(f"- {key}: {value}") lines.extend(self._prompt_collection_line("documents", payload.get("documents"))) lines.extend(self._prompt_collection_line("facts", payload.get("facts"))) lines.extend(self._prompt_collection_line("relations", payload.get("relations"))) api_contract = payload.get("api_contract") if isinstance(api_contract, dict): lines.append("- api_contract:") lines.extend(self._api_contract_lines(api_contract)) return lines def _prompt_collection_line(self, name: str, value) -> list[str]: items = value if isinstance(value, list) else [] if not items: return [f"- {name}: 0"] samples: list[str] = [] for item in items[:3]: if not isinstance(item, dict): continue sample = str( item.get("title") or item.get("content") or item.get("path") or item.get("doc_id") or item.get("id") or "" ).strip() if sample: samples.append(" ".join(sample.split())) suffix = f" | samples: {', '.join(samples)}" if samples else "" return [f"- {name}: {len(items)}{suffix}"] def _api_contract_lines(self, api_contract: dict) -> list[str]: lines: list[str] = [] path = str(api_contract.get("path") or "").strip() or "—" method = str(api_contract.get("method") or "").strip() or "—" request_schema = api_contract.get("request_schema") response_schema = api_contract.get("response_schema") lines.append(f" path: {path}") lines.append(f" method: {method}") lines.append(f" has_request_schema: {bool(request_schema)}") lines.append(f" has_response_schema: {bool(response_schema)}") return lines def _parse_json(self, text: str) -> dict | None: if not text.strip(): return None try: payload = json.loads(text) except json.JSONDecodeError: return None return payload if isinstance(payload, dict) else None def write_summary(self, results: list[V3CaseResult]) -> Path: path = self.run_dir / "summary.md" path.write_text(SummaryComposer().compose(results), encoding="utf-8") return path class SummaryComposer: def compose(self, results: list[V3CaseResult]) -> str: passed = sum(1 for item in results if item.passed) lines = [ "# pipeline_setup_v3 summary", "", f"Passed: {passed}/{len(results)}", "", "| File | Case | Mode | Query | Actual sub-intent | RAG layers | Tokens | Pass |", "|------|------|------|-------|-------------------|------------|--------|------|", ] lines.extend(self._rows(results)) failures = [item for item in results if not item.passed] if failures: lines.extend(["", "## Failures"]) for item in failures: lines.append(f"- **{item.case.case_id}**: {'; '.join(item.mismatches)}") lines.extend(self._llm_section(results)) return "\n".join(lines) def _rows(self, results: list[V3CaseResult]) -> list[str]: rows: list[str] = [] for item in results: rows.append( f"| {item.case.source_file.name} | {item.case.case_id} | {item.case.mode} | " f"{self._cell(item.case.query)} | {item.actual.get('sub_intent') or '—'} | " f"{self._layer_text(item.details)} | {self._token_text(item.details)} | {'✓' if item.passed else '✗'} |" ) return rows def _layer_text(self, details: dict) -> str: counts: dict[str, int] = {} for row in details.get("rag_rows") or []: layer = str(row.get("layer") or "").strip() if layer: counts[layer] = counts.get(layer, 0) + 1 if not counts: return "—" return self._cell(", ".join(f"{key}:{value}" for key, value in sorted(counts.items())), limit=120) def _cell(self, text: str, limit: int = 140) -> str: compact = " ".join(str(text).split()).replace("|", "\\|") if len(compact) <= limit: return compact return compact[: limit - 1].rstrip() + "…" def _token_text(self, details: dict) -> str: token_usage = dict(details.get("token_usage") or {}) direct = token_usage.get("tokens_in_estimate") if direct is not None: return str(direct) prompt = dict(details.get("diagnostics", {}).get("prompt") or {}) stats = dict(prompt.get("prompt_stats") or {}) value = stats.get("tokens_in_estimate") return str(value) if value is not None else "—" def _llm_section(self, results: list[V3CaseResult]) -> list[str]: llm_results = [item for item in results if str(item.actual.get("llm_answer") or "").strip()] if not llm_results: return [] lines = ["", "## LLM Answers"] for item in llm_results: lines.append(f"- **{item.case.case_id}**") lines.append(f" Query: {self._cell(item.case.query, limit=400)}") lines.extend(self._quote_block(self._snippet(str(item.actual.get("llm_answer") or "")))) return lines def _snippet(self, text: str, limit: int = 880) -> str: compact = " ".join(text.split()) if len(compact) <= limit: return compact return compact[: limit - 1].rstrip() + "…" def _quote_block(self, text: str) -> list[str]: quoted = text.strip() if not quoted: return [" > —"] return [f" > {self._escape_markdown(line)}" for line in quoted.splitlines()] def _escape_markdown(self, text: str) -> str: escaped = text for char in ("\\", "`", "*", "_", "{", "}", "[", "]", "(", ")", "#", "+", "-", "!", "|"): escaped = escaped.replace(char, f"\\{char}") return escaped