фиксирую состояние

This commit is contained in:
2026-04-07 21:41:27 +03:00
parent 7387e5cc51
commit f62fb678b8
52 changed files with 4073 additions and 316 deletions
+3 -3
View File
@@ -64,8 +64,8 @@ class ArtifactWriter:
f"- source_file: {result.case.source_file.as_posix()}",
f"- passed: {result.passed}",
"",
"## Query",
result.case.query,
"## Input",
result.case.display_input,
"",
"## Actual",
"```json",
@@ -96,7 +96,7 @@ class SummaryComposer:
]
for item in results:
lines.append(
f"| {item.case.case_id} | {item.case.component} | {self._cell(item.case.query)} | "
f"| {item.case.case_id} | {item.case.component} | {self._cell(item.case.display_input)} | "
f"{item.actual.get('intent') or ''} | {item.actual.get('sub_intent') or ''} | "
f"{'' if item.passed else ''} |"
)
+47 -4
View File
@@ -4,7 +4,7 @@ from pathlib import Path
import yaml
from tests.pipeline_setup_v4.core.models import CaseExpectations, RouterExpectation, V4Case
from tests.pipeline_setup_v4.core.models import CaseExpectations, RetrievalPlanExpectation, RouterExpectation, V4Case
class CaseDirectoryLoader:
@@ -35,13 +35,28 @@ class CaseDirectoryLoader:
case_id = str(raw.get("id") or "").strip()
component = str(raw.get("component") or defaults.get("component") or "").strip()
query = str(raw.get("query") or "").strip()
if not case_id or not component or not query:
raise ValueError(f"Invalid case in {path}: `id`, `component`, `query` are required")
rag_session_id = str(raw.get("rag_session_id") or defaults.get("rag_session_id") or "").strip() or None
route = dict(raw.get("route") or {})
if not route and isinstance(defaults.get("route"), dict):
route = dict(defaults.get("route") or {})
if not case_id or not component:
raise ValueError(f"Invalid case in {path}: `id` and `component` are required")
if component in {
"process_v2_intent_router",
"process_v2_router_plus_retrieval_policy",
"process_v2_router_plus_retrieval_policy_rag",
"process_v2_full_chain",
} and not query:
raise ValueError(f"Invalid case in {path}: `query` is required for {component}")
if component == "process_v2_retrieval_policy_resolver" and not route:
raise ValueError(f"Invalid case in {path}: `route` is required for {component}")
expected = dict(raw.get("expected") or {})
return V4Case(
case_id=case_id,
component=component, # type: ignore[arg-type]
query=query,
rag_session_id=rag_session_id,
route=route,
source_file=path,
expectations=self._to_expectations(expected),
notes=str(raw.get("notes") or ""),
@@ -50,10 +65,38 @@ class CaseDirectoryLoader:
def _to_expectations(self, raw: dict) -> CaseExpectations:
router = dict(raw.get("router") or {})
route = dict(raw.get("route") or {})
retrieval_plan = dict(raw.get("retrieval_plan") or raw.get("plan") or {})
rag = dict(raw.get("rag") or {})
pipeline = dict(raw.get("pipeline") or {})
llm = dict(raw.get("llm") or {})
return CaseExpectations(
router=RouterExpectation(
domain=str(router.get("domain") or "").strip() or None,
intent=str(router.get("intent") or "").strip() or None,
sub_intent=str(router.get("sub_intent") or "").strip() or None,
)
),
retrieval_plan=RetrievalPlanExpectation(
profile=str(retrieval_plan.get("profile") or "").strip() or None,
layers=tuple(str(item).strip() for item in retrieval_plan.get("layers") or [] if str(item).strip()),
limit=int(retrieval_plan["limit"]) if retrieval_plan.get("limit") is not None else None,
filters=self._plain_mapping(dict(retrieval_plan.get("filters") or {})),
),
route_assertions=route,
retrieval_plan_assertions=retrieval_plan,
rag_assertions=rag,
pipeline_assertions=pipeline,
llm_assertions=llm,
)
def _plain_mapping(self, raw: dict[str, object]) -> dict[str, object]:
plain: dict[str, object] = {}
for key, value in raw.items():
if self._is_assertion_key(key) or value in {"present", "absent"}:
continue
plain[key] = value
return plain
def _is_assertion_key(self, key: str) -> bool:
suffixes = ("_not_contains", "_contains_any", "_contains", "_equals_any", "_one_of")
return any(key.endswith(suffix) for suffix in suffixes)
+29 -3
View File
@@ -5,7 +5,13 @@ from pathlib import Path
from typing import Literal
ComponentKind = Literal["process_v2_intent_router"]
ComponentKind = Literal[
"process_v2_intent_router",
"process_v2_retrieval_policy_resolver",
"process_v2_router_plus_retrieval_policy",
"process_v2_router_plus_retrieval_policy_rag",
"process_v2_full_chain",
]
@dataclass(slots=True, frozen=True)
@@ -15,21 +21,41 @@ class RouterExpectation:
sub_intent: str | None = None
@dataclass(slots=True, frozen=True)
class RetrievalPlanExpectation:
profile: str | None = None
layers: tuple[str, ...] = ()
limit: int | None = None
filters: dict[str, object] = field(default_factory=dict)
@dataclass(slots=True, frozen=True)
class CaseExpectations:
router: RouterExpectation = RouterExpectation()
retrieval_plan: RetrievalPlanExpectation = field(default_factory=RetrievalPlanExpectation)
route_assertions: dict[str, object] = field(default_factory=dict)
retrieval_plan_assertions: dict[str, object] = field(default_factory=dict)
rag_assertions: dict[str, object] = field(default_factory=dict)
pipeline_assertions: dict[str, object] = field(default_factory=dict)
llm_assertions: dict[str, object] = field(default_factory=dict)
@dataclass(slots=True, frozen=True)
class V4Case:
case_id: str
component: ComponentKind
query: str
source_file: Path
expectations: CaseExpectations = CaseExpectations()
query: str = ""
rag_session_id: str | None = None
route: dict[str, object] = field(default_factory=dict)
expectations: CaseExpectations = field(default_factory=CaseExpectations)
notes: str = ""
tags: tuple[str, ...] = ()
@property
def display_input(self) -> str:
return self.query or self.route.get("user_query") or self.case_id
@dataclass(slots=True, frozen=True)
class ExecutionPayload:
+236 -4
View File
@@ -1,17 +1,249 @@
from __future__ import annotations
from collections.abc import Mapping, Sequence
from tests.pipeline_setup_v4.core.models import V4Case
class CaseValidator:
def validate(self, case: V4Case, actual: dict) -> list[str]:
if case.component == "process_v2_intent_router":
return self._validate_router(case, actual)
if case.component == "process_v2_retrieval_policy_resolver":
return self._validate_retrieval_plan(case, actual)
if case.component == "process_v2_router_plus_retrieval_policy":
return self._validate_router(case, actual) + self._validate_retrieval_plan(case, actual)
if case.component == "process_v2_router_plus_retrieval_policy_rag":
return self._validate_router(case, actual) + self._validate_retrieval_plan(case, actual) + self._validate_rag(case, actual)
if case.component == "process_v2_full_chain":
return (
self._validate_router(case, actual)
+ self._validate_retrieval_plan(case, actual)
+ self._validate_rag(case, actual)
+ self._validate_pipeline(case, actual)
+ self._validate_llm(case, actual)
)
return [f"unsupported component for validation: {case.component}"]
def _validate_router(self, case: V4Case, actual: dict) -> list[str]:
mismatches: list[str] = []
expected = case.expectations.router
self._check(expected.domain, actual.get("domain"), "domain", mismatches)
self._check(expected.intent, actual.get("intent"), "intent", mismatches)
self._check(expected.sub_intent, actual.get("sub_intent"), "sub_intent", mismatches)
self._check_scalar(expected.domain, actual.get("domain"), "domain", mismatches)
self._check_scalar(expected.intent, actual.get("intent"), "intent", mismatches)
self._check_scalar(expected.sub_intent, actual.get("sub_intent"), "sub_intent", mismatches)
route_actual = actual.get("route")
if isinstance(route_actual, Mapping):
self._check_assertions(case.expectations.route_assertions, route_actual, "route", mismatches)
return mismatches
def _check(self, expected: str | None, actual: object, label: str, mismatches: list[str]) -> None:
def _validate_retrieval_plan(self, case: V4Case, actual: dict) -> list[str]:
mismatches: list[str] = []
expected = case.expectations.retrieval_plan
self._check_scalar(expected.profile, actual.get("profile"), "profile", mismatches)
if expected.layers:
self._check_scalar(list(expected.layers), actual.get("layers"), "layers", mismatches)
self._check_scalar(expected.limit, actual.get("limit"), "limit", mismatches)
self._check_subset(expected.filters, actual.get("filters"), "filters", mismatches)
plan_actual = actual.get("retrieval_plan")
if isinstance(plan_actual, Mapping):
self._check_assertions(case.expectations.retrieval_plan_assertions, plan_actual, "retrieval_plan", mismatches)
self._check_conditional_filter_assertions(case.expectations.retrieval_plan_assertions, actual, mismatches)
return mismatches
def _validate_rag(self, case: V4Case, actual: dict) -> list[str]:
mismatches: list[str] = []
rag_actual = actual.get("rag")
if isinstance(rag_actual, Mapping):
self._check_assertions(case.expectations.rag_assertions, rag_actual, "rag", mismatches)
elif case.expectations.rag_assertions:
mismatches.append("rag: expected mapping, got missing")
return mismatches
def _validate_pipeline(self, case: V4Case, actual: dict) -> list[str]:
mismatches: list[str] = []
pipeline_actual = actual.get("pipeline")
if isinstance(pipeline_actual, Mapping):
self._check_assertions(case.expectations.pipeline_assertions, pipeline_actual, "pipeline", mismatches)
elif case.expectations.pipeline_assertions:
mismatches.append("pipeline: expected mapping, got missing")
return mismatches
def _validate_llm(self, case: V4Case, actual: dict) -> list[str]:
mismatches: list[str] = []
expected = case.expectations.llm_assertions
if not expected:
return mismatches
llm_actual = actual.get("llm")
if not isinstance(llm_actual, Mapping):
mismatches.append("llm: expected mapping, got missing")
return mismatches
answer = str(llm_actual.get("answer") or "")
lowered = answer.lower()
if "non_empty" in expected:
want_non_empty = bool(expected.get("non_empty"))
if want_non_empty and not answer.strip():
mismatches.append("llm.non_empty: expected non-empty answer")
if not want_non_empty and answer.strip():
mismatches.append("llm.non_empty: expected empty answer")
if "contains_all" in expected:
missing = [token for token in self._string_list(expected.get("contains_all")) if token.lower() not in lowered]
if missing:
mismatches.append(f"llm.contains_all: missing {missing}")
if "contains_any" in expected and not self._matches_contains_any(lowered, expected.get("contains_any")):
mismatches.append(f"llm.contains_any: no expected variant matched answer '{answer[:200]}'")
for key, value in expected.items():
if key in {"non_empty", "contains_all", "contains_any"}:
continue
if key not in llm_actual:
mismatches.append(f"llm.{key}: missing")
continue
self._check_assertions(value, llm_actual.get(key), f"llm.{key}", mismatches)
return mismatches
def _check_scalar(self, expected: object, actual: object, label: str, mismatches: list[str]) -> None:
if expected is not None and expected != actual:
mismatches.append(f"{label}: expected {expected}, got {actual}")
def _check_subset(self, expected: object, actual: object, label: str, mismatches: list[str]) -> None:
if expected in (None, {}, []):
return
if isinstance(expected, Mapping):
if not isinstance(actual, Mapping):
mismatches.append(f"{label}: expected dict subset, got {actual}")
return
for key, value in expected.items():
next_label = f"{label}.{key}"
if key not in actual:
mismatches.append(f"{next_label}: missing")
continue
self._check_subset(value, actual.get(key), next_label, mismatches)
return
if expected != actual:
mismatches.append(f"{label}: expected {expected}, got {actual}")
def _check_assertions(self, expected: object, actual: object, label: str, mismatches: list[str]) -> None:
if expected in (None, {}, []):
return
if not isinstance(expected, Mapping):
self._check_scalar(expected, actual, label, mismatches)
return
if not isinstance(actual, Mapping):
mismatches.append(f"{label}: expected mapping, got {actual}")
return
for key, value in expected.items():
if key == "if_anchor_present_then_filter_present":
continue
if key.endswith("_not_contains"):
self._assert_not_contains(actual.get(key.removesuffix("_not_contains")), value, f"{label}.{key}", mismatches)
continue
if key.endswith("_contains"):
self._assert_contains(actual.get(key.removesuffix("_contains")), value, f"{label}.{key}", mismatches)
continue
if key.endswith("_contains_any"):
self._assert_contains_any(actual.get(key.removesuffix("_contains_any")), value, f"{label}.{key}", mismatches)
continue
if key.endswith("_equals_any"):
self._assert_equals_any(actual.get(key.removesuffix("_equals_any")), value, f"{label}.{key}", mismatches)
continue
if key.endswith("_one_of"):
self._assert_equals_any(actual.get(key.removesuffix("_one_of")), value, f"{label}.{key}", mismatches)
continue
if value == "present":
self._assert_present(actual.get(key), f"{label}.{key}", mismatches)
continue
if value == "absent":
self._assert_absent(actual, key, f"{label}.{key}", mismatches)
continue
if key not in actual:
mismatches.append(f"{label}.{key}: missing")
continue
self._check_assertions(value, actual.get(key), f"{label}.{key}", mismatches)
def _assert_contains(self, actual: object, expected: object, label: str, mismatches: list[str]) -> None:
actual_list = self._as_list(actual)
expected_list = self._as_list(expected)
missing = [item for item in expected_list if item not in actual_list]
if missing:
mismatches.append(f"{label}: missing {missing}, got {actual_list}")
def _assert_not_contains(self, actual: object, expected: object, label: str, mismatches: list[str]) -> None:
actual_list = self._as_list(actual)
expected_list = self._as_list(expected)
present = [item for item in expected_list if item in actual_list]
if present:
mismatches.append(f"{label}: unexpected {present}, got {actual_list}")
def _assert_contains_any(self, actual: object, expected: object, label: str, mismatches: list[str]) -> None:
actual_list = self._as_list(actual)
expected_list = self._as_list(expected)
if not any(item in actual_list for item in expected_list):
mismatches.append(f"{label}: expected any of {expected_list}, got {actual_list}")
def _assert_equals_any(self, actual: object, expected: object, label: str, mismatches: list[str]) -> None:
expected_list = self._as_list(expected)
if actual not in expected_list:
mismatches.append(f"{label}: expected any of {expected_list}, got {actual}")
def _assert_present(self, actual: object, label: str, mismatches: list[str]) -> None:
if actual is None or actual == "" or actual == [] or actual == {}:
mismatches.append(f"{label}: expected present, got {actual}")
def _assert_absent(self, actual: Mapping, key: str, label: str, mismatches: list[str]) -> None:
if key in actual and actual.get(key) not in (None, "", [], {}):
mismatches.append(f"{label}: expected absent, got {actual.get(key)}")
def _check_conditional_filter_assertions(self, expected: object, actual: Mapping, mismatches: list[str]) -> None:
if not isinstance(expected, Mapping):
return
rules = expected.get("if_anchor_present_then_filter_present")
if not isinstance(rules, Sequence) or isinstance(rules, (str, bytes, bytearray)):
return
for idx, rule in enumerate(rules):
if not isinstance(rule, Mapping):
continue
anchor_path = str(rule.get("anchor") or "").strip()
filter_path = str(rule.get("filter") or "").strip()
if not anchor_path or not filter_path:
continue
anchor_value = self._resolve_path(actual.get("route"), anchor_path)
if anchor_value in (None, "", [], {}):
continue
filter_value = self._resolve_path(actual.get("retrieval_plan"), filter_path)
if filter_value in (None, "", [], {}):
mismatches.append(
f"conditional[{idx}]: expected {filter_path} present because {anchor_path} is present"
)
def _resolve_path(self, value: object, path: str) -> object:
current = value
parts = [item for item in path.split(".") if item]
for idx, part in enumerate(parts):
if not isinstance(current, Mapping):
return None
remainder = ".".join(parts[idx:])
if remainder in current:
return current.get(remainder)
if part not in current:
return None
current = current.get(part)
return current
def _as_list(self, value: object) -> list[object]:
if value is None:
return []
if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)):
return list(value)
return [value]
def _string_list(self, value: object) -> list[str]:
return [str(item) for item in self._as_list(value) if str(item).strip()]
def _matches_contains_any(self, lowered_answer: str, expected: object) -> bool:
variants = self._as_list(expected)
for variant in variants:
tokens = self._string_list(variant)
if not tokens:
continue
if all(token.lower() in lowered_answer for token in tokens):
return True
return False