from app.modules.rag.contracts.enums import RagLayer from app.modules.rag.indexing.code.pipeline import CodeIndexingPipeline def test_code_pipeline_builds_source_symbols_edges_and_entrypoints() -> None: pipeline = CodeIndexingPipeline() content = """ from fastapi import APIRouter router = APIRouter() class UserService: def get_user(self, user_id): return user_id @router.get("/users/{user_id}") async def get_user(user_id: str): service = UserService() return service.get_user(user_id) """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="app/api/users.py", content=content, ) layers = {doc.layer for doc in docs} assert RagLayer.CODE_SOURCE_CHUNKS in layers assert RagLayer.CODE_SYMBOL_CATALOG in layers assert RagLayer.CODE_DEPENDENCY_GRAPH in layers assert RagLayer.CODE_ENTRYPOINTS in layers assert RagLayer.CODE_SEMANTIC_ROLES in layers symbol_doc = next(doc for doc in docs if doc.layer == RagLayer.CODE_SYMBOL_CATALOG and doc.metadata["kind"] == "function") assert "get_user" in symbol_doc.metadata["qname"] edge_doc = next(doc for doc in docs if doc.layer == RagLayer.CODE_DEPENDENCY_GRAPH) assert edge_doc.metadata["edge_type"] in { "calls", "imports", "inherits", "instantiates", "reads_attr", "writes_attr", "dataflow_slice", } entry_doc = next(doc for doc in docs if doc.layer == RagLayer.CODE_ENTRYPOINTS) assert entry_doc.metadata["framework"] == "fastapi" assert entry_doc.metadata["http_method"] == "GET" assert entry_doc.metadata["route_path"] == "/users/{user_id}" assert entry_doc.metadata["entrypoint_kind"] == "http_route" assert entry_doc.metadata["handler_symbol"] == "get_user" assert entry_doc.metadata["summary_text"] == "GET /users/{user_id} declared in get_user" assert "GET /users/{user_id}" in entry_doc.text def test_code_pipeline_indexes_import_alias_as_symbol() -> None: pipeline = CodeIndexingPipeline() content = "from .v2 import ConfigManagerV2 as ConfigManager\n" docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="src/config_manager/__init__.py", content=content, ) alias_doc = next(doc for doc in docs if doc.layer == RagLayer.CODE_SYMBOL_CATALOG and doc.metadata["qname"] == "ConfigManager") assert alias_doc.metadata["kind"] == "const" def test_code_pipeline_marks_test_documents() -> None: pipeline = CodeIndexingPipeline() content = """ def test_user_service(): assert True """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="tests/test_users.py", content=content, ) assert docs assert all(doc.metadata["is_test"] is True for doc in docs) def test_code_pipeline_extracts_data_flow_edges() -> None: pipeline = CodeIndexingPipeline() content = """ class Context: def __init__(self): self.data = {} def set(self, new_context): self.data = new_context def process(): ctx = Context() value = ctx.data return value """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="src/context.py", content=content, ) edges = [doc.metadata for doc in docs if doc.layer == RagLayer.CODE_DEPENDENCY_GRAPH] edge_pairs = {(str(item.get("edge_type") or ""), str(item.get("dst_ref") or "")) for item in edges} assert ("instantiates", "Context") in edge_pairs assert ("writes_attr", "Context.data") in edge_pairs assert ("reads_attr", "ctx.data") in edge_pairs def test_code_pipeline_builds_dataflow_slice_documents() -> None: pipeline = CodeIndexingPipeline() content = """ class Context: def set(self, value): self.data = value def read_data(ctx): return ctx.data def run(): ctx = Context() Context().set({"order_id": 1}) return read_data(ctx) """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="src/context_flow.py", content=content, ) slices = [ doc for doc in docs if doc.layer == RagLayer.CODE_DEPENDENCY_GRAPH and doc.metadata.get("edge_type") == "dataflow_slice" ] assert slices assert any("Context.data" in item.metadata.get("path_symbols", []) for item in slices) assert all(item.metadata.get("path_length", 0) <= 6 for item in slices) def test_code_pipeline_builds_execution_trace_documents() -> None: pipeline = CodeIndexingPipeline() content = """ from fastapi import APIRouter router = APIRouter() def parse(): return "parsed" def send_email(): return parse() @router.post("/run") def run_pipeline(): return send_email() """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="src/pipeline.py", content=content, ) traces = [doc for doc in docs if doc.layer == RagLayer.CODE_ENTRYPOINTS and doc.metadata.get("edge_type") == "execution_trace"] assert traces assert any(item.metadata.get("path_length", 0) >= 2 for item in traces) assert any("run_pipeline" in item.metadata.get("path_symbols", []) for item in traces) def test_code_pipeline_builds_semantic_role_documents() -> None: pipeline = CodeIndexingPipeline() content = """ class EmailAdapter: def send(self, payload): import requests return requests.post("http://localhost", json=payload) class ExcelParser: def parse(self, rows): import csv return list(csv.reader(rows)) class OrderHandler: def handle(self, ctx, adapter): ctx.data = {"status": "ready"} value = ctx.data return adapter.send(value) """ docs = pipeline.index_file( repo_id="acme/proj", commit_sha="abc123", path="src/semantic_roles.py", content=content, ) roles = { doc.metadata.get("symbol_name"): doc.metadata.get("role") for doc in docs if doc.layer == RagLayer.CODE_SEMANTIC_ROLES } assert roles.get("EmailAdapter") == "adapter" assert roles.get("ExcelParser") == "parser" assert roles.get("OrderHandler") == "handler"