Files
agent/tests/unit_tests/rag/test_docs_indexing_pipeline.py

345 lines
10 KiB
Python

import logging
from app.core.rag.contracts.enums import RagLayer
from app.core.rag.indexing.docs.chunkers.markdown_chunker import SectionChunk
from app.core.rag.indexing.docs.integration_extractor import DocsIntegrationExtractor
from app.core.rag.indexing.docs.pipeline import DocsIndexingPipeline
def test_docs_pipeline_builds_docs_layers_from_modern_markdown_structure() -> None:
pipeline = DocsIndexingPipeline()
content = """---
id: api.billing.create_invoice
type: api_method
doc_type: api_method
name: create_invoice
title: Create Invoice API
module: billing
domain: billing
sub_domain: invoices
layer: application
status: draft
updated_at: 2026-03-23
endpoint: POST /billing/invoices
source_of_truth: analytics
tags: [billing, api]
entities: [Invoice]
parent: billing_api
children: []
related_docs: [api.billing.validate_invoice]
links:
called_by:
- ui.billing.invoice_form
uses_logic:
- logic.billing.invoice_validation
---
# Create Invoice API
## Summary
Creates an invoice in billing.
## Details
### Описание
Создает счет на оплату.
### Сценарий
**Название:**
Create invoice
**Предусловия:**
- billing service is available
**Триггер:**
- client sends create invoice request
**Основной сценарий:**
1. Validate payload.
2. Create invoice.
**Альтернативный сценарий:**
1. Reject invalid payload.
**Обработка ошибок:**
1. Return validation error.
**Постусловие:**
- Invoice is created.
### Контракт
#### Метаданные вызова
- Method: POST
- Auth: USER
- Idempotency: false
### Входные параметры
| field | type | required | validation |
| --- | --- | --- | --- |
| amount | decimal | yes | > 0 |
### Выходные параметры
| field | type | required |
| --- | --- | --- |
| invoice_id | string | yes |
### Интеграции
#### Billing DB
- target: db.billing.invoices
- target_type: db
- direction: outbound
- interaction: writes
- via: invoice repository
- purpose: persist created invoices
- details:
- transaction: required
- tables:
- invoices
- invoice_items
### Ошибки
| status | error | client action |
| --- | --- | --- |
| 400 | invalid_amount | fix request |
"""
docs = pipeline.index_file(
repo_id="acme/proj",
commit_sha="abc123",
path="docs/billing/create_invoice.md",
content=content,
)
layers = {doc.layer for doc in docs}
assert RagLayer.DOCS_DOC_CHUNKS in layers
assert RagLayer.DOCS_DOCUMENT_CATALOG in layers
assert RagLayer.DOCS_FACT_INDEX in layers
assert RagLayer.DOCS_ENTITY_CATALOG in layers
assert RagLayer.DOCS_WORKFLOW_INDEX in layers
assert RagLayer.DOCS_RELATION_GRAPH in layers
assert RagLayer.DOCS_INTEGRATION_INDEX in layers
catalog_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_DOCUMENT_CATALOG)
assert catalog_doc.metadata["document_id"] == "api.billing.create_invoice"
assert catalog_doc.metadata["id"] == "api.billing.create_invoice"
assert catalog_doc.metadata["module"] == "billing"
assert catalog_doc.metadata["domain"] == "billing"
assert catalog_doc.metadata["sub_domain"] == "invoices"
assert "subdomain" not in catalog_doc.metadata
assert catalog_doc.metadata["endpoint"] == "POST /billing/invoices"
assert catalog_doc.metadata["source_of_truth"] == "analytics"
assert catalog_doc.metadata["summary_text"] == "Creates an invoice in billing."
fact_texts = [doc.text for doc in docs if doc.layer == RagLayer.DOCS_FACT_INDEX]
assert any("has_field amount" in text for text in fact_texts)
assert any("field_required amount:yes" in text for text in fact_texts)
assert any("returns_error invalid_amount" in text for text in fact_texts)
entity_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_ENTITY_CATALOG)
assert entity_doc.metadata["entity_name"] == "Invoice"
workflow_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_WORKFLOW_INDEX)
assert workflow_doc.metadata["workflow_name"] == "Create invoice"
relation_targets = [doc.metadata["target_id"] for doc in docs if doc.layer == RagLayer.DOCS_RELATION_GRAPH]
assert "billing_api" in relation_targets
assert "api.billing.validate_invoice" in relation_targets
assert "logic.billing.invoice_validation" in relation_targets
assert "Invoice" in relation_targets
chunk_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_DOC_CHUNKS)
assert chunk_doc.metadata["section_path"]
assert chunk_doc.metadata["artifact_type"] == "DOCS"
assert chunk_doc.metadata["domain"] == "billing"
assert chunk_doc.metadata["subdomain"] == "invoices"
integration_doc = next(doc for doc in docs if doc.layer == RagLayer.DOCS_INTEGRATION_INDEX)
assert integration_doc.metadata["target"] == "db.billing.invoices"
assert integration_doc.metadata["target_type"] == "db"
assert integration_doc.metadata["details"]["transaction"] == "required"
def test_docs_integration_extractor_keeps_valid_blocks() -> None:
extractor = DocsIntegrationExtractor()
sections = [
SectionChunk(
section_path="Details > Интеграции > Billing DB",
section_title="Billing DB",
content=(
"- target: db.billing.invoices\n"
"- target_type: db\n"
"- direction: outbound\n"
"- interaction: writes\n"
"- via: invoice repository\n"
"- purpose: persist created invoices\n"
"- details:\n"
" - transaction: required\n"
" - tables:\n"
" - invoices\n"
" - invoice_items\n"
),
order=0,
)
]
records = extractor.extract(sections, path="docs/billing/create_invoice.md")
assert len(records) == 1
assert records[0].target == "db.billing.invoices"
assert records[0].details["transaction"] == "required"
assert records[0].details["tables"] == ["invoices", "invoice_items"]
def test_docs_integration_extractor_soft_fails_on_markdown_like_yaml(caplog) -> None:
extractor = DocsIntegrationExtractor()
sections = [
SectionChunk(
section_path="Details > Интеграции > Runtime health provider",
section_title="Runtime health provider",
content=(
"- target: runtime.health_provider\n"
"- target_type: service\n"
"- direction: outbound\n"
"- interaction: depends_on\n"
"- via: async callback `health_provider()`\n"
"- purpose: получить агрегированный health runtime\n"
"- details:\n"
" - timeout_ms: 5000\n"
" - response_type: `HealthPayload`\n"
),
order=0,
)
]
with caplog.at_level(logging.WARNING):
records = extractor.extract(sections, path="docs/api/health-endpoint.md")
assert len(records) == 1
assert records[0].target == "runtime.health_provider"
assert records[0].via == "async callback `health_provider()`"
assert records[0].details == {}
assert "docs integration parse warning" in caplog.text
assert "docs/api/health-endpoint.md" in caplog.text
def test_docs_pipeline_keeps_other_layers_when_integration_block_is_invalid(caplog) -> None:
pipeline = DocsIndexingPipeline()
content = """---
id: api.runtime.health
type: api_method
doc_type: api_method
name: runtime_health
title: Runtime Health API
module: runtime
domain: platform
sub_domain: observability
layer: application
status: active
related_docs: []
links:
uses_logic:
- logic.runtime.health
---
# Runtime Health API
## Summary
Returns current runtime health.
## Details
### Описание
Возвращает агрегированное состояние runtime.
### Сценарий
**Название:**
Read health
**Предусловия:**
- runtime is running
**Триггер:**
- client calls health endpoint
**Основной сценарий:**
1. Read current state.
2. Return payload.
### Входные параметры
| field | type | required |
| --- | --- | --- |
| verbose | boolean | no |
### Интеграции
#### Runtime health provider
- target: runtime.health_provider
- target_type: service
- direction: outbound
- interaction: depends_on
- via: async callback `health_provider()`
- purpose: получить агрегированный health runtime
- details:
- timeout_ms: 5000
- response_type: `HealthPayload`
"""
with caplog.at_level(logging.WARNING):
docs = pipeline.index_file(
repo_id="acme/proj",
commit_sha="abc123",
path="docs/api/health-endpoint.md",
content=content,
)
layers = {doc.layer for doc in docs}
assert RagLayer.DOCS_DOCUMENT_CATALOG in layers
assert RagLayer.DOCS_DOC_CHUNKS in layers
assert RagLayer.DOCS_FACT_INDEX in layers
assert RagLayer.DOCS_WORKFLOW_INDEX in layers
assert RagLayer.DOCS_RELATION_GRAPH in layers
assert RagLayer.DOCS_INTEGRATION_INDEX in layers
assert "docs integration parse warning" in caplog.text
assert all(doc.source.path == "docs/api/health-endpoint.md" for doc in docs)
def test_docs_pipeline_tolerates_broken_frontmatter_and_keeps_api_type() -> None:
pipeline = DocsIndexingPipeline()
content = """---
id: api.control_actions_endpoint
type: api_method
doc_type: api_method
title: HTTP API /actions/{action}
endpoint: GET|POST /actions/{action}
links:
called_by:
- ext.operator
tags:
- api
---
# HTTP API /actions/{action}
## Summary
Control actions endpoint.
"""
docs = pipeline.index_file(
repo_id="acme/proj",
commit_sha="abc123",
path="docs/api/control-actions-endpoint.md",
content=content,
)
catalog = next(doc for doc in docs if doc.layer == RagLayer.DOCS_DOCUMENT_CATALOG)
assert catalog.metadata["type"] == "api_method"
assert catalog.metadata["title"] == "HTTP API /actions/{action}"
assert catalog.metadata["endpoint"] == "GET|POST /actions/{action}"