Compare commits

...

5 Commits

Author SHA1 Message Date
15586f9a8c Рефакторинг 2026-03-12 23:33:51 +03:00
9066c292de Удаление легаси 2026-03-12 21:11:16 +03:00
b1f825e6b9 Фиксируем состояние 2026-03-12 20:40:29 +03:00
095d354112 Фикс состояния 2026-03-12 16:55:30 +03:00
6ba0a18ac9 Фикс состояния 2026-03-12 16:55:23 +03:00
1406 changed files with 551823 additions and 8758 deletions

2
.gitignore vendored
View File

@@ -1 +1,3 @@
.env
.venv
__pycache__

1326
README.md

File diff suppressed because it is too large Load Diff

161
README_old.md Normal file
View File

@@ -0,0 +1,161 @@
# Агент для работы с проектной документацией
## 1. Общее описание
Приложение представляет собой backend агентного режима для работы с документацией и кодом проекта.
Система решает следующие задачи:
- индексирует локальную копию проекта в `rag_session` и использует ее как основной рабочий контекст пользователя;
- принимает webhook коммитов репозитория в `rag_repo` и фиксирует контекст изменений по `story_id`;
- ускоряет построение `rag_session` за счет переиспользования кэша чанков и эмбеддингов из `rag_repo`;
- обрабатывает пользовательские запросы через `chat`, `agent`, оркестратор и специализированные графы;
- сохраняет quality-метрики, Story-контекст и артефакты сессии в PostgreSQL.
Ключевая идея архитектуры:
- `rag_session` отвечает за пользовательскую рабочую сессию и всегда остается основным источником retrieval;
- `rag_repo` не участвует напрямую в пользовательском ответе, а служит фоновым источником кэша и контекста коммитов;
- `story_id` связывает изменения аналитика, документацию и последующую работу тестировщика.
## 2. Архитектура
```mermaid
flowchart LR
User["Пользователь"]
Git["Git репозиторий\n(Gitea / Bitbucket)"]
Chat["Модуль chat"]
Agent["Модуль agent"]
RagSession["Модуль rag_session"]
RagRepo["Модуль rag_repo"]
Shared["Модуль shared"]
DB["PostgreSQL + pgvector"]
Giga["GigaChat API"]
User --> Chat
Chat --> Agent
Agent --> RagSession
Agent --> Shared
RagSession --> Shared
RagRepo --> Shared
Chat --> DB
Agent --> DB
RagSession --> DB
RagRepo --> DB
RagSession --> Giga
Agent --> Giga
Git --> RagRepo
RagRepo -.кэш и контекст коммитов.-> RagSession
```
Кратко по ролям модулей:
- `chat` — внешний API чата, фоновые задачи, SSE события, диалоги.
- `agent` — роутер интентов, оркестратор, графы, tools, генерация ответа и changeset.
- `rag_session` — создание и сопровождение пользовательского RAG индекса по локальным файлам.
- `rag_repo` — прием webhook коммитов, определение `story_id`, фиксация контекста коммита и заполнение repo-cache.
- `shared` — инфраструктурный слой: БД, retry, event bus, GigaChat client, настройки.
## 3. Типичный флоу
```mermaid
sequenceDiagram
actor User as Пользователь
participant Git as Git репозиторий
participant RagRepo as Модуль rag_repo
participant DB as PostgreSQL
participant RagSession as Модуль rag_session
participant Chat as Модуль chat
participant Agent as Модуль agent
Note over User,RagSession: Первая рабочая сессия: кэша репозитория еще нет
User->>RagSession: Создание rag_session и загрузка файлов проекта
RagSession->>DB: Проверка cache hit/miss
DB-->>RagSession: Кэш пуст
RagSession->>RagSession: Чанкинг и расчет embeddings без repo-cache
RagSession->>DB: Сохранение rag_chunks и rag_index_jobs
User->>Chat: Отправка запроса в чат
Chat->>Agent: Передача задачи
Agent->>RagSession: Retrieval контекста
RagSession->>DB: Чтение rag_chunks
DB-->>RagSession: Релевантные чанки
RagSession-->>Agent: Контекст
Agent-->>Chat: Ответ / changeset
Note over User,Git: Пользователь вручную делает commit и push
Git->>RagRepo: Webhook push
RagRepo->>RagRepo: Определение provider, commit_sha, changed_files, story_id
RagRepo->>DB: Запись story_records/story_links/story_artifacts
RagRepo->>DB: Запись rag_blob_cache/rag_chunk_cache/rag_session_chunk_map
Note over User,RagSession: Вторая рабочая сессия: repo-cache уже существует
User->>RagSession: Создание новой rag_session и загрузка файлов проекта
RagSession->>DB: Проверка cache hit/miss
DB-->>RagSession: Найдены cache hit по части файлов
RagSession->>RagSession: Переиспользование чанков из rag_repo cache
RagSession->>RagSession: Пересчет embeddings только для cache miss файлов
RagSession->>DB: Сохранение rag_chunks, rag_session_chunk_map, rag_index_jobs
User->>Chat: Новый запрос
Chat->>Agent: Передача задачи
Agent->>RagSession: Retrieval по новой сессии
RagSession-->>Agent: Контекст из session-RAG
Agent-->>Chat: Ответ / changeset
```
Что важно в этом сценарии:
- первый запуск индексации может быть полностью без кэша;
- после коммита `rag_repo` фиксирует контекст изменений и наполняет cache-таблицы;
- во второй и последующих сессиях `rag_session` использует `cache_hit_files`, чтобы уменьшить объем новых embeddings.
## 4. Инструкции к запуску
### Локальный запуск
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
uvicorn app.main:app --reload --port 15000
```
### Запуск через Docker Compose
1. Создать `.env` на основе примера:
```bash
cp .env.example .env
```
2. Заполнить как минимум `GIGACHAT_TOKEN`.
3. Запустить сервисы:
```bash
docker compose up -d --build
```
4. Проверить доступность backend:
```bash
curl http://localhost:15000/health
```
Ожидаемый ответ:
```json
{"status":"ok"}
```
### Основные адреса
- Backend API: `http://localhost:15000`
- PostgreSQL + pgvector: `localhost:5432`
- Webhook репозитория: `POST /internal/rag-repo/webhook`
### Базовый сценарий проверки
1. Создать `rag_session` через `POST /api/rag/sessions`.
2. Дождаться завершения index-job через `GET /api/rag/sessions/{rag_session_id}/jobs/{index_job_id}`.
3. Создать диалог через `POST /api/chat/dialogs`.
4. Отправить сообщение через `POST /api/chat/messages`.
5. Настроить webhook репозитория на `POST /internal/rag-repo/webhook`.
6. Сделать commit с `story_id` в сообщении, например `FEAT-1 ...`.
7. Проверить заполнение таблиц:
- `story_records`, `story_links`, `story_artifacts`
- `rag_blob_cache`, `rag_chunk_cache`, `rag_session_chunk_map`
8. Во второй сессии индексации проверить поля job-статуса:
- `indexed_files`
- `cache_hit_files`
- `cache_miss_files`
### Полезные замечания
- Текущая chat-модель: `GigaChat`.
- Основной retrieval всегда идет из `rag_session`.
- `rag_repo` используется как фоновый источник кэша и контекста коммитов.
- Если в webhook не найден `story_id`, commit-контекст Story не будет привязан, но cache-таблицы все равно должны наполняться.

Binary file not shown.

View File

@@ -1,91 +0,0 @@
# Модуль agent
## 1. Функции модуля
- Оркестрация выполнения пользовательского запроса поверх роутера интентов и графов.
- Формирование `TaskSpec`, запуск оркестратора шагов и сборка финального результата.
- Реализация необходимых для агента tools и их интеграция с остальной логикой выполнения.
- Сохранение quality-метрик и session-артефактов для последующей привязки к Story.
## 2. Диаграмма классов и взаимосвязей
```mermaid
classDiagram
class AgentModule
class GraphAgentRuntime
class OrchestratorService
class TaskSpecBuilder
class StorySessionRecorder
class StoryContextRepository
class ConfluenceService
class AgentRepository
AgentModule --> GraphAgentRuntime
AgentModule --> ConfluenceService
AgentModule --> StorySessionRecorder
StorySessionRecorder --> StoryContextRepository
GraphAgentRuntime --> OrchestratorService
GraphAgentRuntime --> TaskSpecBuilder
GraphAgentRuntime --> AgentRepository
GraphAgentRuntime --> ConfluenceService
```
## 3. Описание классов
- `AgentModule`: собирает runtime и публикует внутренние tools-роуты.
Методы: `__init__` — связывает зависимости модуля; `internal_router` — регистрирует internal API tools.
- `GraphAgentRuntime`: основной исполнитель агентного запроса.
Методы: `run` — выполняет цикл route -> retrieval -> orchestration -> ответ/changeset.
- `OrchestratorService`: управляет планом шагов и выполнением quality gates.
Методы: `run` — строит, валидирует и исполняет execution plan.
- `TaskSpecBuilder`: формирует спецификацию задачи для оркестратора.
Методы: `build` — собирает `TaskSpec` из route, контекстов и ограничений.
- `ProjectQaConversationGraphFactory`, `ProjectQaClassificationGraphFactory`, `ProjectQaRetrievalGraphFactory`, `ProjectQaAnalysisGraphFactory`, `ProjectQaAnswerGraphFactory`: набор маленьких graph-исполнителей для `project/qa`.
Роли: нормализация запроса; классификация project-question; поздний retrieval из `RAG`; анализ code/docs контекста; сборка финального ответа.
- `StorySessionRecorder`: пишет session-scoped артефакты для последующего bind к Story.
Методы: `record_run` — сохраняет входные источники и выходные артефакты сессии.
- `StoryContextRepository`: репозиторий Story-контекста и его связей.
Методы: `record_story_commit` — фиксирует commit-контекст Story; `upsert_story` — создает/обновляет карточку Story; `add_session_artifact` — добавляет session-артефакт; `bind_session_to_story` — переносит артефакты сессии в Story; `add_artifact` — добавляет версионный Story-артефакт; `get_story_context` — возвращает агрегированный контекст Story.
- `ConfluenceService`: tool для загрузки страницы по URL.
Методы: `fetch_page` — валидирует URL и возвращает нормализованный payload страницы.
- `AgentRepository`: хранение router-контекста и quality-метрик.
Методы: `ensure_tables` — создает таблицы модуля; `get_router_context` — читает контекст маршрутизации; `update_router_context` — обновляет историю диалога и last-route; `save_quality_metrics` — сохраняет метрики качества; `get_quality_metrics` — читает историю метрик.
## 4. Сиквенс-диаграммы API
### POST /internal/tools/confluence/fetch
Назначение: загружает страницу Confluence по URL и возвращает ее контент для дальнейшего использования в сценариях агента.
```mermaid
sequenceDiagram
participant Router as AgentModule.APIRouter
participant Confluence as ConfluenceService
Router->>Confluence: fetch_page(url)
Confluence-->>Router: page(content_markdown, metadata)
```
### `project/qa` reasoning flow
Назначение: оркестратор планирует шаги, а каждый шаг исполняется отдельным graph. Retrieval вызывается поздно, внутри шага `context_retrieval`.
```mermaid
sequenceDiagram
participant Runtime as GraphAgentRuntime
participant Orch as OrchestratorService
participant G1 as conversation_understanding
participant G2 as question_classification
participant G3 as context_retrieval
participant Rag as RagService
participant G4 as context_analysis
participant G5 as answer_composition
Runtime->>Orch: run(task)
Orch->>G1: execute
G1-->>Orch: resolved_request
Orch->>G2: execute
G2-->>Orch: question_profile
Orch->>G3: execute
G3->>Rag: retrieve(query)
Rag-->>G3: rag_items
G3-->>Orch: source_bundle
Orch->>G4: execute
G4-->>Orch: analysis_brief
Orch->>G5: execute
G5-->>Orch: final_answer
Orch-->>Runtime: final_answer
```

View File

@@ -1,20 +0,0 @@
from app.core.constants import SUPPORTED_SCHEMA_VERSION
from app.core.exceptions import AppError
from app.schemas.changeset import ChangeItem, ChangeSetPayload
from app.schemas.common import ModuleName
class ChangeSetValidator:
def validate(self, task_id: str, changeset: list[ChangeItem]) -> list[ChangeItem]:
payload = ChangeSetPayload(
schema_version=SUPPORTED_SCHEMA_VERSION,
task_id=task_id,
changeset=changeset,
)
if payload.schema_version != SUPPORTED_SCHEMA_VERSION:
raise AppError(
"unsupported_schema",
f"Unsupported schema version: {payload.schema_version}",
ModuleName.AGENT,
)
return payload.changeset

View File

@@ -1,20 +0,0 @@
from datetime import datetime, timezone
from urllib.parse import urlparse
from uuid import uuid4
from app.core.exceptions import AppError
from app.schemas.common import ModuleName
class ConfluenceService:
async def fetch_page(self, url: str) -> dict:
parsed = urlparse(url)
if not parsed.scheme.startswith("http"):
raise AppError("invalid_url", "Invalid Confluence URL", ModuleName.CONFLUENCE)
return {
"page_id": str(uuid4()),
"title": "Confluence page",
"content_markdown": f"Fetched content from {url}",
"version": 1,
"fetched_at": datetime.now(timezone.utc).isoformat(),
}

View File

@@ -1,51 +0,0 @@
__all__ = [
"BaseGraphFactory",
"DocsGraphFactory",
"ProjectQaAnalysisGraphFactory",
"ProjectQaAnswerGraphFactory",
"ProjectQaClassificationGraphFactory",
"ProjectQaConversationGraphFactory",
"ProjectEditsGraphFactory",
"ProjectQaGraphFactory",
"ProjectQaRetrievalGraphFactory",
]
def __getattr__(name: str):
if name == "BaseGraphFactory":
from app.modules.agent.engine.graphs.base_graph import BaseGraphFactory
return BaseGraphFactory
if name == "DocsGraphFactory":
from app.modules.agent.engine.graphs.docs_graph import DocsGraphFactory
return DocsGraphFactory
if name == "ProjectQaConversationGraphFactory":
from app.modules.agent.engine.graphs.project_qa_step_graphs import ProjectQaConversationGraphFactory
return ProjectQaConversationGraphFactory
if name == "ProjectQaClassificationGraphFactory":
from app.modules.agent.engine.graphs.project_qa_step_graphs import ProjectQaClassificationGraphFactory
return ProjectQaClassificationGraphFactory
if name == "ProjectQaRetrievalGraphFactory":
from app.modules.agent.engine.graphs.project_qa_step_graphs import ProjectQaRetrievalGraphFactory
return ProjectQaRetrievalGraphFactory
if name == "ProjectQaAnalysisGraphFactory":
from app.modules.agent.engine.graphs.project_qa_step_graphs import ProjectQaAnalysisGraphFactory
return ProjectQaAnalysisGraphFactory
if name == "ProjectQaAnswerGraphFactory":
from app.modules.agent.engine.graphs.project_qa_step_graphs import ProjectQaAnswerGraphFactory
return ProjectQaAnswerGraphFactory
if name == "ProjectEditsGraphFactory":
from app.modules.agent.engine.graphs.project_edits_graph import ProjectEditsGraphFactory
return ProjectEditsGraphFactory
if name == "ProjectQaGraphFactory":
from app.modules.agent.engine.graphs.project_qa_graph import ProjectQaGraphFactory
return ProjectQaGraphFactory
raise AttributeError(name)

View File

@@ -1,73 +0,0 @@
import logging
from langgraph.graph import END, START, StateGraph
from app.modules.agent.engine.graphs.progress import emit_progress_sync
from app.modules.agent.llm import AgentLlmService
from app.modules.agent.engine.graphs.state import AgentGraphState
LOGGER = logging.getLogger(__name__)
class BaseGraphFactory:
def __init__(self, llm: AgentLlmService) -> None:
self._llm = llm
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("context", self._context_node)
graph.add_node("answer", self._answer_node)
graph.add_edge(START, "context")
graph.add_edge("context", "answer")
graph.add_edge("answer", END)
return graph.compile(checkpointer=checkpointer)
def _context_node(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.default.context",
message="Готовлю контекст ответа по данным запроса.",
)
rag = state.get("rag_context", "")
conf = state.get("confluence_context", "")
emit_progress_sync(
state,
stage="graph.default.context.done",
message="Контекст собран, перехожу к формированию ответа.",
)
result = {"rag_context": rag, "confluence_context": conf}
LOGGER.warning(
"graph step result: graph=default step=context rag_len=%s confluence_len=%s",
len(rag or ""),
len(conf or ""),
)
return result
def _answer_node(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.default.answer",
message="Формирую текст ответа для пользователя.",
)
msg = state.get("message", "")
rag = state.get("rag_context", "")
conf = state.get("confluence_context", "")
user_input = "\n\n".join(
[
f"User request:\n{msg}",
f"RAG context:\n{rag}",
f"Confluence context:\n{conf}",
]
)
answer = self._llm.generate("general_answer", user_input, log_context="graph.default.answer")
emit_progress_sync(
state,
stage="graph.default.answer.done",
message="Черновик ответа подготовлен.",
)
result = {"answer": answer}
LOGGER.warning(
"graph step result: graph=default step=answer answer_len=%s",
len(answer or ""),
)
return result

View File

@@ -1,26 +0,0 @@
from pathlib import Path
import os
class DocsExamplesLoader:
def __init__(self, prompts_dir: Path | None = None) -> None:
base = prompts_dir or Path(__file__).resolve().parents[2] / "prompts"
env_override = os.getenv("AGENT_PROMPTS_DIR", "").strip()
root = Path(env_override) if env_override else base
self._examples_dir = root / "docs_examples"
def load_bundle(self, *, max_files: int = 6, max_chars_per_file: int = 1800) -> str:
if not self._examples_dir.is_dir():
return ""
files = sorted(
[p for p in self._examples_dir.iterdir() if p.is_file() and p.suffix.lower() in {".md", ".txt"}],
key=lambda p: p.name.lower(),
)[:max_files]
chunks: list[str] = []
for path in files:
content = path.read_text(encoding="utf-8", errors="ignore").strip()
if not content:
continue
excerpt = content[:max_chars_per_file].strip()
chunks.append(f"### Example: {path.name}\n{excerpt}")
return "\n\n".join(chunks).strip()

View File

@@ -1,128 +0,0 @@
from langgraph.graph import END, START, StateGraph
import logging
from app.modules.agent.engine.graphs.file_targeting import FileTargeting
from app.modules.agent.engine.graphs.docs_graph_logic import DocsContentComposer, DocsContextAnalyzer
from app.modules.agent.engine.graphs.progress import emit_progress_sync
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.llm import AgentLlmService
LOGGER = logging.getLogger(__name__)
class DocsGraphFactory:
_max_validation_attempts = 2
def __init__(self, llm: AgentLlmService) -> None:
self._targeting = FileTargeting()
self._analyzer = DocsContextAnalyzer(llm, self._targeting)
self._composer = DocsContentComposer(llm, self._targeting)
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("collect_code_context", self._collect_code_context)
graph.add_node("detect_existing_docs", self._detect_existing_docs)
graph.add_node("decide_strategy", self._decide_strategy)
graph.add_node("load_rules_and_examples", self._load_rules_and_examples)
graph.add_node("plan_incremental_changes", self._plan_incremental_changes)
graph.add_node("plan_new_document", self._plan_new_document)
graph.add_node("generate_doc_content", self._generate_doc_content)
graph.add_node("self_check", self._self_check)
graph.add_node("build_changeset", self._build_changeset)
graph.add_node("summarize_result", self._summarize_result)
graph.add_edge(START, "collect_code_context")
graph.add_edge("collect_code_context", "detect_existing_docs")
graph.add_edge("detect_existing_docs", "decide_strategy")
graph.add_edge("decide_strategy", "load_rules_and_examples")
graph.add_conditional_edges(
"load_rules_and_examples",
self._route_after_rules_loading,
{
"incremental": "plan_incremental_changes",
"from_scratch": "plan_new_document",
},
)
graph.add_edge("plan_incremental_changes", "generate_doc_content")
graph.add_edge("plan_new_document", "generate_doc_content")
graph.add_edge("generate_doc_content", "self_check")
graph.add_conditional_edges(
"self_check",
self._route_after_self_check,
{"retry": "generate_doc_content", "ready": "build_changeset"},
)
graph.add_edge("build_changeset", "summarize_result")
graph.add_edge("summarize_result", END)
return graph.compile(checkpointer=checkpointer)
def _collect_code_context(self, state: AgentGraphState) -> dict:
return self._run_node(state, "collect_code_context", "Собираю контекст кода и файлов.", self._analyzer.collect_code_context)
def _detect_existing_docs(self, state: AgentGraphState) -> dict:
return self._run_node(
state,
"detect_existing_docs",
"Определяю, есть ли существующая документация проекта.",
self._analyzer.detect_existing_docs,
)
def _decide_strategy(self, state: AgentGraphState) -> dict:
return self._run_node(state, "decide_strategy", "Выбираю стратегию: инкремент или генерация с нуля.", self._analyzer.decide_strategy)
def _load_rules_and_examples(self, state: AgentGraphState) -> dict:
return self._run_node(
state,
"load_rules_and_examples",
"Загружаю правила и примеры формата документации.",
self._composer.load_rules_and_examples,
)
def _plan_incremental_changes(self, state: AgentGraphState) -> dict:
return self._run_node(
state,
"plan_incremental_changes",
"Планирую точечные изменения в существующей документации.",
lambda st: self._composer.plan_incremental_changes(st, self._analyzer),
)
def _plan_new_document(self, state: AgentGraphState) -> dict:
return self._run_node(state, "plan_new_document", "Проектирую структуру новой документации.", self._composer.plan_new_document)
def _generate_doc_content(self, state: AgentGraphState) -> dict:
return self._run_node(state, "generate_doc_content", "Генерирую содержимое документации.", self._composer.generate_doc_content)
def _self_check(self, state: AgentGraphState) -> dict:
return self._run_node(state, "self_check", "Проверяю соответствие результата правилам.", self._composer.self_check)
def _build_changeset(self, state: AgentGraphState) -> dict:
return self._run_node(state, "build_changeset", "Формирую итоговый набор изменений файлов.", self._composer.build_changeset)
def _summarize_result(self, state: AgentGraphState) -> dict:
return self._run_node(
state,
"summarize_result",
"Формирую краткий обзор выполненных действий и измененных файлов.",
self._composer.build_execution_summary,
)
def _route_after_rules_loading(self, state: AgentGraphState) -> str:
if state.get("docs_strategy") == "incremental_update":
return "incremental"
return "from_scratch"
def _route_after_self_check(self, state: AgentGraphState) -> str:
if state.get("validation_passed"):
return "ready"
attempts = int(state.get("validation_attempts", 0) or 0)
return "ready" if attempts >= self._max_validation_attempts else "retry"
def _run_node(self, state: AgentGraphState, node_name: str, message: str, fn):
emit_progress_sync(state, stage=f"graph.docs.{node_name}", message=message)
try:
result = fn(state)
emit_progress_sync(state, stage=f"graph.docs.{node_name}.done", message=f"Шаг '{node_name}' завершен.")
LOGGER.warning("docs graph node completed: node=%s keys=%s", node_name, sorted(result.keys()))
return result
except Exception:
LOGGER.exception("docs graph node failed: node=%s", node_name)
raise

View File

@@ -1,523 +0,0 @@
import json
from difflib import SequenceMatcher
from app.modules.agent.engine.graphs.docs_examples_loader import DocsExamplesLoader
from app.modules.agent.engine.graphs.file_targeting import FileTargeting
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.llm import AgentLlmService
from app.schemas.changeset import ChangeItem
import logging
LOGGER = logging.getLogger(__name__)
class DocsContextAnalyzer:
def __init__(self, llm: AgentLlmService, targeting: FileTargeting) -> None:
self._llm = llm
self._targeting = targeting
def collect_code_context(self, state: AgentGraphState) -> dict:
message = state.get("message", "")
files_map = state.get("files_map", {}) or {}
requested_path = self._targeting.extract_target_path(message)
target_file = self._targeting.lookup_file(files_map, requested_path) if requested_path else None
docs_candidates = self._collect_doc_candidates(files_map)
target_path = str((target_file or {}).get("path") or (requested_path or "")).strip() or ""
return {
"docs_candidates": docs_candidates,
"target_path": target_path,
"target_file_content": str((target_file or {}).get("content", "")),
"target_file_hash": str((target_file or {}).get("content_hash", "")),
"validation_attempts": 0,
}
def detect_existing_docs(self, state: AgentGraphState) -> dict:
docs_candidates = state.get("docs_candidates", []) or []
if not docs_candidates:
return {
"existing_docs_detected": False,
"existing_docs_summary": "No documentation files detected in current project context.",
}
snippets = "\n\n".join(
[
f"Path: {item.get('path', '')}\nSnippet:\n{self._shorten(item.get('content', ''), 500)}"
for item in docs_candidates[:8]
]
)
user_input = "\n\n".join(
[
f"User request:\n{state.get('message', '')}",
f"Requested target path:\n{state.get('target_path', '') or '(not specified)'}",
f"Detected documentation candidates:\n{snippets}",
]
)
raw = self._llm.generate("docs_detect", user_input, log_context="graph.docs.detect_existing_docs")
exists = self.parse_bool_marker(raw, "exists", default=True)
summary = self.parse_text_marker(raw, "summary", default="Documentation files detected.")
return {"existing_docs_detected": exists, "existing_docs_summary": summary}
def decide_strategy(self, state: AgentGraphState) -> dict:
message = (state.get("message", "") or "").lower()
if any(token in message for token in ("с нуля", "from scratch", "new documentation", "создай документацию")):
return {"docs_strategy": "from_scratch"}
if any(token in message for token in ("дополни", "обнови документацию", "extend docs", "update docs")):
return {"docs_strategy": "incremental_update"}
user_input = "\n\n".join(
[
f"User request:\n{state.get('message', '')}",
f"Existing docs detected:\n{state.get('existing_docs_detected', False)}",
f"Existing docs summary:\n{state.get('existing_docs_summary', '')}",
]
)
raw = self._llm.generate("docs_strategy", user_input, log_context="graph.docs.decide_strategy")
strategy = self.parse_text_marker(raw, "strategy", default="").lower()
if strategy not in {"incremental_update", "from_scratch"}:
strategy = "incremental_update" if state.get("existing_docs_detected", False) else "from_scratch"
return {"docs_strategy": strategy}
def resolve_target_for_incremental(self, state: AgentGraphState) -> tuple[str, dict | None]:
files_map = state.get("files_map", {}) or {}
preferred_path = state.get("target_path", "")
preferred = self._targeting.lookup_file(files_map, preferred_path)
if preferred:
return str(preferred.get("path") or preferred_path), preferred
candidates = state.get("docs_candidates", []) or []
if candidates:
first_path = str(candidates[0].get("path", ""))
resolved = self._targeting.lookup_file(files_map, first_path) or candidates[0]
return first_path, resolved
fallback = preferred_path.strip() or "docs/AGENT_DRAFT.md"
return fallback, None
def _collect_doc_candidates(self, files_map: dict[str, dict]) -> list[dict]:
candidates: list[dict] = []
for raw_path, payload in files_map.items():
path = str(raw_path or "").replace("\\", "/").strip()
if not path:
continue
low = path.lower()
is_doc = low.startswith("docs/") or low.endswith(".md") or low.endswith(".rst") or "/readme" in low or low.startswith("readme")
if not is_doc:
continue
candidates.append(
{
"path": str(payload.get("path") or path),
"content": str(payload.get("content", "")),
"content_hash": str(payload.get("content_hash", "")),
}
)
candidates.sort(key=lambda item: (0 if str(item.get("path", "")).lower().startswith("docs/") else 1, str(item.get("path", "")).lower()))
return candidates
def _shorten(self, text: str, max_chars: int) -> str:
value = (text or "").strip()
if len(value) <= max_chars:
return value
return value[:max_chars].rstrip() + "\n...[truncated]"
@staticmethod
def parse_bool_marker(text: str, marker: str, *, default: bool) -> bool:
value = DocsContextAnalyzer.parse_text_marker(text, marker, default="")
if not value:
return default
token = value.split()[0].strip().lower()
if token in {"yes", "true", "1", "да"}:
return True
if token in {"no", "false", "0", "нет"}:
return False
return default
@staticmethod
def parse_text_marker(text: str, marker: str, *, default: str) -> str:
low_marker = f"{marker.lower()}:"
for line in (text or "").splitlines():
raw = line.strip()
if raw.lower().startswith(low_marker):
return raw.split(":", 1)[1].strip()
return default
class DocsBundleFormatter:
def shorten(self, text: str, max_chars: int) -> str:
value = (text or "").strip()
if len(value) <= max_chars:
return value
return value[:max_chars].rstrip() + "\n...[truncated]"
def normalize_file_output(self, text: str) -> str:
value = (text or "").strip()
if value.startswith("```") and value.endswith("```"):
lines = value.splitlines()
if len(lines) >= 3:
return "\n".join(lines[1:-1]).strip()
return value
def parse_docs_bundle(self, raw_text: str) -> list[dict]:
text = (raw_text or "").strip()
if not text:
return []
candidate = self.normalize_file_output(text)
parsed = self._parse_json_candidate(candidate)
if parsed is None:
start = candidate.find("{")
end = candidate.rfind("}")
if start != -1 and end > start:
parsed = self._parse_json_candidate(candidate[start : end + 1])
if parsed is None:
return []
files: list[dict]
if isinstance(parsed, dict):
raw_files = parsed.get("files")
files = raw_files if isinstance(raw_files, list) else []
elif isinstance(parsed, list):
files = parsed
else:
files = []
out: list[dict] = []
seen: set[str] = set()
for item in files:
if not isinstance(item, dict):
continue
path = str(item.get("path", "")).replace("\\", "/").strip()
content = str(item.get("content", ""))
if not path or not content.strip():
continue
if path in seen:
continue
seen.add(path)
out.append(
{
"path": path,
"content": content,
"reason": str(item.get("reason", "")).strip(),
}
)
return out
def bundle_has_required_structure(self, bundle: list[dict]) -> bool:
if not bundle:
return False
has_api = any(str(item.get("path", "")).replace("\\", "/").startswith("docs/api/") for item in bundle)
has_logic = any(str(item.get("path", "")).replace("\\", "/").startswith("docs/logic/") for item in bundle)
return has_api and has_logic
def similarity(self, original: str, updated: str) -> float:
return SequenceMatcher(None, original or "", updated or "").ratio()
def line_change_ratio(self, original: str, updated: str) -> float:
orig_lines = (original or "").splitlines()
new_lines = (updated or "").splitlines()
if not orig_lines and not new_lines:
return 0.0
matcher = SequenceMatcher(None, orig_lines, new_lines)
changed = 0
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == "equal":
continue
changed += max(i2 - i1, j2 - j1)
total = max(len(orig_lines), len(new_lines), 1)
return changed / total
def added_headings(self, original: str, updated: str) -> int:
old_heads = {line.strip() for line in (original or "").splitlines() if line.strip().startswith("#")}
new_heads = {line.strip() for line in (updated or "").splitlines() if line.strip().startswith("#")}
return len(new_heads - old_heads)
def collapse_whitespace(self, text: str) -> str:
return " ".join((text or "").split())
def _parse_json_candidate(self, text: str):
try:
return json.loads(text)
except Exception:
return None
class DocsContentComposer:
def __init__(self, llm: AgentLlmService, targeting: FileTargeting) -> None:
self._llm = llm
self._targeting = targeting
self._examples = DocsExamplesLoader()
self._bundle = DocsBundleFormatter()
def load_rules_and_examples(self, _state: AgentGraphState) -> dict:
return {"rules_bundle": self._examples.load_bundle()}
def plan_incremental_changes(self, state: AgentGraphState, analyzer: DocsContextAnalyzer) -> dict:
target_path, target = analyzer.resolve_target_for_incremental(state)
user_input = "\n\n".join(
[
"Strategy: incremental_update",
f"User request:\n{state.get('message', '')}",
f"Target path:\n{target_path}",
f"Current target content:\n{self._bundle.shorten((target or {}).get('content', ''), 3000)}",
f"RAG context:\n{self._bundle.shorten(state.get('rag_context', ''), 6000)}",
f"Examples bundle:\n{state.get('rules_bundle', '')}",
]
)
plan = self._llm.generate("docs_plan_sections", user_input, log_context="graph.docs.plan_incremental_changes")
return {
"doc_plan": plan,
"target_path": target_path,
"target_file_content": str((target or {}).get("content", "")),
"target_file_hash": str((target or {}).get("content_hash", "")),
}
def plan_new_document(self, state: AgentGraphState) -> dict:
target_path = state.get("target_path", "").strip() or "docs/AGENT_DRAFT.md"
user_input = "\n\n".join(
[
"Strategy: from_scratch",
f"User request:\n{state.get('message', '')}",
f"Target path:\n{target_path}",
f"RAG context:\n{self._bundle.shorten(state.get('rag_context', ''), 6000)}",
f"Examples bundle:\n{state.get('rules_bundle', '')}",
]
)
plan = self._llm.generate("docs_plan_sections", user_input, log_context="graph.docs.plan_new_document")
return {"doc_plan": plan, "target_path": target_path, "target_file_content": "", "target_file_hash": ""}
def generate_doc_content(self, state: AgentGraphState) -> dict:
user_input = "\n\n".join(
[
f"Strategy:\n{state.get('docs_strategy', 'from_scratch')}",
f"User request:\n{state.get('message', '')}",
f"Target path:\n{state.get('target_path', '')}",
f"Document plan:\n{state.get('doc_plan', '')}",
f"Current target content:\n{self._bundle.shorten(state.get('target_file_content', ''), 3500)}",
f"RAG context:\n{self._bundle.shorten(state.get('rag_context', ''), 7000)}",
f"Examples bundle:\n{state.get('rules_bundle', '')}",
]
)
raw = self._llm.generate("docs_generation", user_input, log_context="graph.docs.generate_doc_content")
bundle = self._bundle.parse_docs_bundle(raw)
if bundle:
first_content = str(bundle[0].get("content", "")).strip()
return {"generated_docs_bundle": bundle, "generated_doc": first_content}
content = self._bundle.normalize_file_output(raw)
return {"generated_docs_bundle": [], "generated_doc": content}
def self_check(self, state: AgentGraphState) -> dict:
attempts = int(state.get("validation_attempts", 0) or 0) + 1
bundle = state.get("generated_docs_bundle", []) or []
generated = state.get("generated_doc", "")
if not generated.strip() and not bundle:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": "Generated document is empty.",
}
strategy = state.get("docs_strategy", "from_scratch")
if strategy == "from_scratch" and not self._bundle.bundle_has_required_structure(bundle):
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": "Bundle must include both docs/api and docs/logic for from_scratch strategy.",
}
if strategy == "incremental_update":
if bundle and len(bundle) > 1 and not self._is_broad_rewrite_request(str(state.get("message", ""))):
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": "Incremental update should not touch multiple files without explicit broad rewrite request.",
}
original = str(state.get("target_file_content", ""))
broad = self._is_broad_rewrite_request(str(state.get("message", "")))
if original and generated:
if self._bundle.collapse_whitespace(original) == self._bundle.collapse_whitespace(generated):
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": "Only formatting/whitespace changes detected.",
}
similarity = self._bundle.similarity(original, generated)
change_ratio = self._bundle.line_change_ratio(original, generated)
added_headings = self._bundle.added_headings(original, generated)
min_similarity = 0.75 if broad else 0.9
max_change_ratio = 0.7 if broad else 0.35
if similarity < min_similarity:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": f"Incremental update is too broad (similarity={similarity:.2f}).",
}
if change_ratio > max_change_ratio:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": f"Incremental update changes too many lines (change_ratio={change_ratio:.2f}).",
}
if not broad and added_headings > 0:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": "New section headings were added outside requested scope.",
}
bundle_text = "\n".join([f"- {item.get('path', '')}" for item in bundle[:30]])
user_input = "\n\n".join(
[
f"Strategy:\n{strategy}",
f"User request:\n{state.get('message', '')}",
f"Document plan:\n{state.get('doc_plan', '')}",
f"Generated file paths:\n{bundle_text or '(single-file mode)'}",
f"Generated document:\n{generated}",
]
)
raw = self._llm.generate("docs_self_check", user_input, log_context="graph.docs.self_check")
passed = DocsContextAnalyzer.parse_bool_marker(raw, "pass", default=False)
feedback = DocsContextAnalyzer.parse_text_marker(raw, "feedback", default="No validation feedback provided.")
return {"validation_attempts": attempts, "validation_passed": passed, "validation_feedback": feedback}
def build_changeset(self, state: AgentGraphState) -> dict:
files_map = state.get("files_map", {}) or {}
bundle = state.get("generated_docs_bundle", []) or []
strategy = state.get("docs_strategy", "from_scratch")
if strategy == "from_scratch" and not self._bundle.bundle_has_required_structure(bundle):
LOGGER.info(
"build_changeset fallback bundle used: strategy=%s bundle_items=%s",
strategy,
len(bundle),
)
bundle = self._build_fallback_bundle_from_text(state.get("generated_doc", ""))
if bundle:
changes: list[ChangeItem] = []
for item in bundle:
path = str(item.get("path", "")).replace("\\", "/").strip()
content = str(item.get("content", ""))
if not path or not content.strip():
continue
target = self._targeting.lookup_file(files_map, path)
reason = str(item.get("reason", "")).strip() or f"Documentation {strategy}: generated file from structured bundle."
if target and target.get("content_hash"):
changes.append(
ChangeItem(
op="update",
path=str(target.get("path") or path),
base_hash=str(target.get("content_hash", "")),
proposed_content=content,
reason=reason,
)
)
else:
changes.append(
ChangeItem(
op="create",
path=path,
proposed_content=content,
reason=reason,
)
)
if changes:
return {"changeset": changes}
target_path = (state.get("target_path", "") or "").strip() or "docs/AGENT_DRAFT.md"
target = self._targeting.lookup_file(files_map, target_path)
content = state.get("generated_doc", "")
if target and target.get("content_hash"):
change = ChangeItem(
op="update",
path=str(target.get("path") or target_path),
base_hash=str(target.get("content_hash", "")),
proposed_content=content,
reason=f"Documentation {strategy}: update existing document increment.",
)
else:
change = ChangeItem(
op="create",
path=target_path,
proposed_content=content,
reason=f"Documentation {strategy}: create document from current project context.",
)
return {"changeset": [change]}
def build_execution_summary(self, state: AgentGraphState) -> dict:
changeset = state.get("changeset", []) or []
if not changeset:
return {"answer": "Документация не была изменена: итоговый changeset пуст."}
file_lines = self._format_changed_files(changeset)
user_input = "\n\n".join(
[
f"User request:\n{state.get('message', '')}",
f"Documentation strategy:\n{state.get('docs_strategy', 'from_scratch')}",
f"Document plan:\n{state.get('doc_plan', '')}",
f"Validation feedback:\n{state.get('validation_feedback', '')}",
f"Changed files:\n{file_lines}",
]
)
try:
summary = self._llm.generate(
"docs_execution_summary",
user_input,
log_context="graph.docs.summarize_result",
).strip()
except Exception:
summary = ""
if not summary:
summary = self._build_fallback_summary(state, file_lines)
return {"answer": summary}
def _build_fallback_bundle_from_text(self, text: str) -> list[dict]:
content = (text or "").strip()
if not content:
content = (
"# Project Documentation Draft\n\n"
"## Overview\n"
"Documentation draft was generated, but structured sections require уточнение.\n"
)
return [
{
"path": "docs/logic/project_overview.md",
"content": content,
"reason": "Fallback: generated structured logic document from non-JSON model output.",
},
{
"path": "docs/api/README.md",
"content": (
"# API Methods\n\n"
"This file is a fallback placeholder for API method documentation.\n\n"
"## Next Step\n"
"- Add one file per API method under `docs/api/`.\n"
),
"reason": "Fallback: ensure required docs/api structure exists.",
},
]
def _format_changed_files(self, changeset: list[ChangeItem]) -> str:
lines: list[str] = []
for item in changeset[:30]:
lines.append(f"- {item.op.value} {item.path}: {item.reason}")
return "\n".join(lines)
def _build_fallback_summary(self, state: AgentGraphState, file_lines: str) -> str:
request = (state.get("message", "") or "").strip()
return "\n".join(
[
"Выполненные действия:",
f"- Обработан запрос: {request or '(пустой запрос)'}",
f"- Применена стратегия документации: {state.get('docs_strategy', 'from_scratch')}",
"- Сформирован и проверен changeset для документации.",
"",
"Измененные файлы:",
file_lines or "- (нет изменений)",
]
)
def _is_broad_rewrite_request(self, message: str) -> bool:
low = (message or "").lower()
markers = (
"перепиши",
"полностью",
"целиком",
"с нуля",
"full rewrite",
"rewrite all",
"реорганизуй",
)
return any(marker in low for marker in markers)

View File

@@ -1,28 +0,0 @@
import re
class FileTargeting:
_path_pattern = re.compile(r"([A-Za-z0-9_.\-/]+?\.[A-Za-z0-9_]+)")
def extract_target_path(self, message: str) -> str | None:
text = (message or "").replace("\\", "/")
candidates = self._path_pattern.findall(text)
if not candidates:
return None
for candidate in candidates:
cleaned = candidate.strip("`'\".,:;()[]{}")
if "/" in cleaned or cleaned.startswith("."):
return cleaned
return candidates[0].strip("`'\".,:;()[]{}")
def lookup_file(self, files_map: dict[str, dict], path: str | None) -> dict | None:
if not path:
return None
normalized = path.replace("\\", "/")
if normalized in files_map:
return files_map[normalized]
low = normalized.lower()
for key, value in files_map.items():
if key.lower() == low:
return value
return None

View File

@@ -1,44 +0,0 @@
from collections.abc import Awaitable, Callable
import inspect
import asyncio
from app.modules.agent.engine.graphs.progress_registry import progress_registry
from app.modules.agent.engine.graphs.state import AgentGraphState
ProgressCallback = Callable[[str, str, str, dict | None], Awaitable[None] | None]
async def emit_progress(
state: AgentGraphState,
*,
stage: str,
message: str,
kind: str = "task_progress",
meta: dict | None = None,
) -> None:
callback = progress_registry.get(state.get("progress_key"))
if callback is None:
return
result = callback(stage, message, kind, meta or {})
if inspect.isawaitable(result):
await result
def emit_progress_sync(
state: AgentGraphState,
*,
stage: str,
message: str,
kind: str = "task_progress",
meta: dict | None = None,
) -> None:
callback = progress_registry.get(state.get("progress_key"))
if callback is None:
return
result = callback(stage, message, kind, meta or {})
if inspect.isawaitable(result):
try:
loop = asyncio.get_running_loop()
loop.create_task(result)
except RuntimeError:
pass

View File

@@ -1,27 +0,0 @@
from collections.abc import Awaitable, Callable
from threading import Lock
ProgressCallback = Callable[[str, str, str, dict | None], Awaitable[None] | None]
class ProgressRegistry:
def __init__(self) -> None:
self._items: dict[str, ProgressCallback] = {}
self._lock = Lock()
def register(self, key: str, callback: ProgressCallback) -> None:
with self._lock:
self._items[key] = callback
def get(self, key: str | None) -> ProgressCallback | None:
if not key:
return None
with self._lock:
return self._items.get(key)
def unregister(self, key: str) -> None:
with self._lock:
self._items.pop(key, None)
progress_registry = ProgressRegistry()

View File

@@ -1,171 +0,0 @@
import re
from dataclasses import dataclass, field
@dataclass
class BlockContract:
type: str
max_changed_lines: int = 6
start_anchor: str = ""
end_anchor: str = ""
old_line: str = ""
def as_dict(self) -> dict:
return {
"type": self.type,
"max_changed_lines": self.max_changed_lines,
"start_anchor": self.start_anchor,
"end_anchor": self.end_anchor,
"old_line": self.old_line,
}
@dataclass
class FileEditContract:
path: str
reason: str
intent: str = "update"
max_hunks: int = 1
max_changed_lines: int = 8
allowed_blocks: list[BlockContract] = field(default_factory=list)
def as_dict(self) -> dict:
return {
"path": self.path,
"reason": self.reason,
"intent": self.intent,
"max_hunks": self.max_hunks,
"max_changed_lines": self.max_changed_lines,
"allowed_blocks": [block.as_dict() for block in self.allowed_blocks],
}
class ContractParser:
_supported_block_types = {"append_end", "replace_between", "replace_line_equals"}
def parse(self, payload: dict, *, request: str, requested_path: str) -> list[dict]:
files = payload.get("files", []) if isinstance(payload, dict) else []
parsed: list[FileEditContract] = []
for item in files if isinstance(files, list) else []:
contract = self._parse_file_contract(item)
if contract:
parsed.append(contract)
if not parsed:
fallback = self._fallback_contract(request=request, requested_path=requested_path)
if fallback:
parsed.append(fallback)
return [item.as_dict() for item in parsed]
def _parse_file_contract(self, item: object) -> FileEditContract | None:
if not isinstance(item, dict):
return None
path = str(item.get("path", "")).replace("\\", "/").strip()
if not path:
return None
reason = str(item.get("reason", "")).strip() or "Requested user adjustment."
intent = str(item.get("intent", "update")).strip().lower() or "update"
if intent not in {"update", "create"}:
intent = "update"
max_hunks = self._clamp_int(item.get("max_hunks"), default=1, min_value=1, max_value=5)
max_changed_lines = self._clamp_int(item.get("max_changed_lines"), default=8, min_value=1, max_value=120)
blocks: list[BlockContract] = []
raw_blocks = item.get("allowed_blocks", [])
for raw in raw_blocks if isinstance(raw_blocks, list) else []:
block = self._parse_block(raw)
if block:
blocks.append(block)
if not blocks:
return None
return FileEditContract(
path=path,
reason=reason,
intent=intent,
max_hunks=max_hunks,
max_changed_lines=max_changed_lines,
allowed_blocks=blocks,
)
def _parse_block(self, raw: object) -> BlockContract | None:
if not isinstance(raw, dict):
return None
kind = self._normalize_block_type(str(raw.get("type", "")).strip().lower())
if kind not in self._supported_block_types:
return None
max_changed_lines = self._clamp_int(raw.get("max_changed_lines"), default=6, min_value=1, max_value=80)
block = BlockContract(
type=kind,
max_changed_lines=max_changed_lines,
start_anchor=str(raw.get("start_anchor", "")).strip(),
end_anchor=str(raw.get("end_anchor", "")).strip(),
old_line=str(raw.get("old_line", "")).strip(),
)
if block.type == "replace_between" and (not block.start_anchor or not block.end_anchor):
return None
if block.type == "replace_line_equals" and not block.old_line:
return None
return block
def _fallback_contract(self, *, request: str, requested_path: str) -> FileEditContract | None:
path = requested_path.strip()
if not path:
return None
low = (request or "").lower()
if any(marker in low for marker in ("в конец", "в самый конец", "append to end", "append at the end")):
return FileEditContract(
path=path,
reason="Append-only update inferred from user request.",
intent="update",
max_hunks=1,
max_changed_lines=8,
allowed_blocks=[BlockContract(type="append_end", max_changed_lines=8)],
)
quoted = self._extract_quoted_line(request)
if quoted:
return FileEditContract(
path=path,
reason="Single-line replacement inferred from quoted segment in user request.",
intent="update",
max_hunks=1,
max_changed_lines=4,
allowed_blocks=[BlockContract(type="replace_line_equals", old_line=quoted, max_changed_lines=4)],
)
return None
def _extract_quoted_line(self, text: str) -> str:
value = (text or "").strip()
patterns = [
r"`([^`]+)`",
r"\"([^\"]+)\"",
r"'([^']+)'",
r"«([^»]+)»",
]
for pattern in patterns:
match = re.search(pattern, value)
if not match:
continue
candidate = match.group(1).strip()
if candidate:
return candidate
return ""
def _normalize_block_type(self, value: str) -> str:
mapping = {
"append": "append_end",
"append_eof": "append_end",
"end_append": "append_end",
"replace_block": "replace_between",
"replace_section": "replace_between",
"replace_range": "replace_between",
"replace_line": "replace_line_equals",
"line_equals": "replace_line_equals",
}
return mapping.get(value, value)
def _clamp_int(self, value: object, *, default: int, min_value: int, max_value: int) -> int:
try:
numeric = int(value) # type: ignore[arg-type]
except Exception:
numeric = default
return max(min_value, min(max_value, numeric))

View File

@@ -1,102 +0,0 @@
import logging
from langgraph.graph import END, START, StateGraph
from app.modules.agent.engine.graphs.progress import emit_progress_sync
from app.modules.agent.engine.graphs.project_edits_logic import ProjectEditsLogic
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.llm import AgentLlmService
LOGGER = logging.getLogger(__name__)
class ProjectEditsGraphFactory:
_max_validation_attempts = 2
def __init__(self, llm: AgentLlmService) -> None:
self._logic = ProjectEditsLogic(llm)
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("collect_context", self._collect_context)
graph.add_node("plan_changes", self._plan_changes)
graph.add_node("generate_changeset", self._generate_changeset)
graph.add_node("self_check", self._self_check)
graph.add_node("build_result", self._build_result)
graph.add_edge(START, "collect_context")
graph.add_edge("collect_context", "plan_changes")
graph.add_edge("plan_changes", "generate_changeset")
graph.add_edge("generate_changeset", "self_check")
graph.add_conditional_edges(
"self_check",
self._route_after_self_check,
{"retry": "generate_changeset", "ready": "build_result"},
)
graph.add_edge("build_result", END)
return graph.compile(checkpointer=checkpointer)
def _collect_context(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_edits.collect_context",
message="Собираю контекст и релевантные файлы для правок.",
)
result = self._logic.collect_context(state)
self._log_step_result("collect_context", result)
return result
def _plan_changes(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_edits.plan_changes",
message="Определяю, что именно нужно изменить и в каких файлах.",
)
result = self._logic.plan_changes(state)
self._log_step_result("plan_changes", result)
return result
def _generate_changeset(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_edits.generate_changeset",
message="Формирую предлагаемые правки по выбранным файлам.",
)
result = self._logic.generate_changeset(state)
self._log_step_result("generate_changeset", result)
return result
def _self_check(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_edits.self_check",
message="Проверяю, что правки соответствуют запросу и не трогают лишнее.",
)
result = self._logic.self_check(state)
self._log_step_result("self_check", result)
return result
def _build_result(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_edits.build_result",
message="Формирую итоговый changeset и краткий обзор.",
)
result = self._logic.build_result(state)
self._log_step_result("build_result", result)
return result
def _route_after_self_check(self, state: AgentGraphState) -> str:
if state.get("validation_passed"):
return "ready"
attempts = int(state.get("validation_attempts", 0) or 0)
return "ready" if attempts >= self._max_validation_attempts else "retry"
def _log_step_result(self, step: str, result: dict) -> None:
LOGGER.warning(
"graph step result: graph=project_edits step=%s keys=%s changeset_items=%s answer_len=%s",
step,
sorted(result.keys()),
len(result.get("changeset", []) or []),
len(str(result.get("answer", "") or "")),
)

View File

@@ -1,240 +0,0 @@
import json
from app.modules.agent.engine.graphs.project_edits_contract import ContractParser
from app.modules.agent.engine.graphs.project_edits_patcher import ContractPatcher
from app.modules.agent.engine.graphs.project_edits_support import ProjectEditsSupport
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.llm import AgentLlmService
from app.schemas.changeset import ChangeItem
class ProjectEditsLogic:
def __init__(self, llm: AgentLlmService) -> None:
self._llm = llm
self._support = ProjectEditsSupport()
self._contracts = ContractParser()
self._patcher = ContractPatcher()
def collect_context(self, state: AgentGraphState) -> dict:
message = state.get("message", "")
files_map = state.get("files_map", {}) or {}
requested_path = self._support.lookup_file(files_map, self._extract_path_hint(message))
candidates = self._support.pick_relevant_files(message, files_map)
if requested_path and not any(x["path"] == requested_path.get("path") for x in candidates):
candidates.insert(0, self._support.as_candidate(requested_path))
return {
"edits_requested_path": str((requested_path or {}).get("path", "")).strip() or self._extract_path_hint(message),
"edits_context_files": candidates[:12],
"validation_attempts": 0,
}
def plan_changes(self, state: AgentGraphState) -> dict:
user_input = json.dumps(
{
"request": state.get("message", ""),
"requested_path": state.get("edits_requested_path", ""),
"context_files": [
{
"path": item.get("path", ""),
"content_preview": self._support.shorten(str(item.get("content", "")), 2200),
}
for item in (state.get("edits_context_files", []) or [])
],
"contract_requirements": {
"must_define_allowed_blocks": True,
"max_hunks_per_file": 5,
"default_intent": "update",
},
},
ensure_ascii=False,
)
parsed = self._support.parse_json(
self._llm.generate("project_edits_plan", user_input, log_context="graph.project_edits.plan_changes")
)
contracts = self._contracts.parse(
parsed,
request=str(state.get("message", "")),
requested_path=str(state.get("edits_requested_path", "")),
)
plan = [{"path": item.get("path", ""), "reason": item.get("reason", "")} for item in contracts]
return {"edits_contracts": contracts, "edits_plan": plan}
def generate_changeset(self, state: AgentGraphState) -> dict:
files_map = state.get("files_map", {}) or {}
contracts = state.get("edits_contracts", []) or []
changeset: list[ChangeItem] = []
feedback: list[str] = []
for contract in contracts:
if not isinstance(contract, dict):
continue
path = str(contract.get("path", "")).replace("\\", "/").strip()
if not path:
continue
intent = str(contract.get("intent", "update")).strip().lower() or "update"
source = self._support.lookup_file(files_map, path)
if intent == "update" and source is None:
feedback.append(f"{path}: update requested but source file was not provided.")
continue
current_content = str((source or {}).get("content", ""))
hunks, error = self._generate_hunks_for_contract(state, contract, current_content)
if error:
feedback.append(f"{path}: {error}")
continue
proposed, apply_error = self._patcher.apply(current_content, contract, hunks)
if apply_error:
feedback.append(f"{path}: {apply_error}")
continue
if proposed is None:
feedback.append(f"{path}: patch application returned empty result.")
continue
if intent == "update":
if proposed == current_content:
feedback.append(f"{path}: no-op update produced by model.")
continue
if self._support.collapse_whitespace(proposed) == self._support.collapse_whitespace(current_content):
feedback.append(f"{path}: whitespace-only update is not allowed.")
continue
reason = str(contract.get("reason", "")).strip() or "Requested user adjustment."
if source and source.get("content_hash"):
changeset.append(
ChangeItem(
op="update",
path=str(source.get("path") or path),
base_hash=str(source.get("content_hash", "")),
proposed_content=proposed,
reason=reason,
hunks=hunks,
)
)
else:
changeset.append(
ChangeItem(
op="create",
path=path,
proposed_content=proposed,
reason=reason,
hunks=hunks,
)
)
return {"changeset": changeset, "edits_generation_feedback": " | ".join(feedback)}
def self_check(self, state: AgentGraphState) -> dict:
attempts = int(state.get("validation_attempts", 0) or 0) + 1
changeset = state.get("changeset", []) or []
files_map = state.get("files_map", {}) or {}
if not changeset:
feedback = str(state.get("edits_generation_feedback", "")).strip() or "Generated changeset is empty."
return {"validation_attempts": attempts, "validation_passed": False, "validation_feedback": feedback}
broad = self._support.is_broad_rewrite_request(str(state.get("message", "")))
for item in changeset:
if item.op.value != "update":
continue
source = self._support.lookup_file(files_map, item.path)
if not source:
continue
original = str(source.get("content", ""))
proposed = item.proposed_content or ""
similarity = self._support.similarity(original, proposed)
change_ratio = self._support.line_change_ratio(original, proposed)
added_headings = self._support.added_headings(original, proposed)
min_similarity = 0.75 if broad else 0.9
max_change_ratio = 0.7 if broad else 0.35
if similarity < min_similarity:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": f"File {item.path} changed too aggressively (similarity={similarity:.2f}).",
}
if change_ratio > max_change_ratio:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": f"File {item.path} changed too broadly (change_ratio={change_ratio:.2f}).",
}
if not broad and added_headings > 0:
return {
"validation_attempts": attempts,
"validation_passed": False,
"validation_feedback": f"File {item.path} adds new sections outside requested scope.",
}
payload = {
"request": state.get("message", ""),
"contracts": state.get("edits_contracts", []),
"changeset": [{"op": x.op.value, "path": x.path, "reason": x.reason} for x in changeset[:20]],
"rule": "Changes must stay inside contract blocks and not affect unrelated sections.",
}
parsed = self._support.parse_json(
self._llm.generate(
"project_edits_self_check",
json.dumps(payload, ensure_ascii=False),
log_context="graph.project_edits.self_check",
)
)
passed = bool(parsed.get("pass")) if isinstance(parsed, dict) else False
feedback = str(parsed.get("feedback", "")).strip() if isinstance(parsed, dict) else ""
return {
"validation_attempts": attempts,
"validation_passed": passed,
"validation_feedback": feedback or "No validation feedback provided.",
}
def build_result(self, state: AgentGraphState) -> dict:
changeset = state.get("changeset", []) or []
return {"changeset": changeset, "answer": self._support.build_summary(state, changeset)}
def _generate_hunks_for_contract(
self,
state: AgentGraphState,
contract: dict,
current_content: str,
) -> tuple[list[dict], str | None]:
prompt_payload = {
"request": state.get("message", ""),
"contract": contract,
"current_content": self._support.shorten(current_content, 18000),
"previous_validation_feedback": state.get("validation_feedback", ""),
"rag_context": self._support.shorten(state.get("rag_context", ""), 5000),
"confluence_context": self._support.shorten(state.get("confluence_context", ""), 5000),
}
raw = self._llm.generate(
"project_edits_hunks",
json.dumps(prompt_payload, ensure_ascii=False),
log_context="graph.project_edits.generate_changeset",
)
parsed = self._support.parse_json(raw)
hunks = parsed.get("hunks", []) if isinstance(parsed, dict) else []
if not isinstance(hunks, list) or not hunks:
return [], "Model did not return contract hunks."
normalized: list[dict] = []
for hunk in hunks:
if not isinstance(hunk, dict):
continue
kind = str(hunk.get("type", "")).strip().lower()
if kind not in {"append_end", "replace_between", "replace_line_equals"}:
continue
normalized.append(
{
"type": kind,
"start_anchor": str(hunk.get("start_anchor", "")),
"end_anchor": str(hunk.get("end_anchor", "")),
"old_line": str(hunk.get("old_line", "")),
"new_text": str(hunk.get("new_text", "")),
}
)
if not normalized:
return [], "Model hunks are empty or invalid."
return normalized, None
def _extract_path_hint(self, message: str) -> str:
words = (message or "").replace("\\", "/").split()
for token in words:
cleaned = token.strip("`'\".,:;()[]{}")
if "/" in cleaned and "." in cleaned:
return cleaned
if cleaned.lower().startswith("readme"):
return "README.md"
return ""

View File

@@ -1,142 +0,0 @@
from difflib import SequenceMatcher
class ContractPatcher:
def apply(self, current_content: str, contract: dict, hunks: list[dict]) -> tuple[str | None, str | None]:
if not hunks:
return None, "No hunks were generated."
max_hunks = int(contract.get("max_hunks", 1) or 1)
if len(hunks) > max_hunks:
return None, f"Too many hunks: got={len(hunks)} allowed={max_hunks}."
allowed_blocks = contract.get("allowed_blocks", [])
if not isinstance(allowed_blocks, list) or not allowed_blocks:
return None, "No allowed blocks in edit contract."
result = current_content
total_changed_lines = 0
for idx, hunk in enumerate(hunks, start=1):
applied, changed_lines, error = self._apply_hunk(result, hunk, allowed_blocks)
if error:
return None, f"Hunk {idx} rejected: {error}"
result = applied
total_changed_lines += changed_lines
max_changed_lines = int(contract.get("max_changed_lines", 8) or 8)
if total_changed_lines > max_changed_lines:
return (
None,
f"Changed lines exceed contract limit: changed={total_changed_lines} allowed={max_changed_lines}.",
)
return result, None
def _apply_hunk(
self,
content: str,
hunk: dict,
allowed_blocks: list[dict],
) -> tuple[str, int, str | None]:
if not isinstance(hunk, dict):
return content, 0, "Invalid hunk payload."
kind = str(hunk.get("type", "")).strip().lower()
if kind not in {"append_end", "replace_between", "replace_line_equals"}:
return content, 0, f"Unsupported hunk type: {kind or '(empty)'}."
block = self._find_matching_block(hunk, allowed_blocks)
if block is None:
return content, 0, "Hunk does not match allowed contract blocks."
if kind == "append_end":
return self._apply_append_end(content, hunk, block)
if kind == "replace_between":
return self._apply_replace_between(content, hunk, block)
return self._apply_replace_line_equals(content, hunk, block)
def _find_matching_block(self, hunk: dict, allowed_blocks: list[dict]) -> dict | None:
kind = str(hunk.get("type", "")).strip().lower()
for block in allowed_blocks:
if not isinstance(block, dict):
continue
block_type = str(block.get("type", "")).strip().lower()
if block_type != kind:
continue
if kind == "replace_between":
start = str(hunk.get("start_anchor", "")).strip()
end = str(hunk.get("end_anchor", "")).strip()
if start != str(block.get("start_anchor", "")).strip():
continue
if end != str(block.get("end_anchor", "")).strip():
continue
if kind == "replace_line_equals":
old_line = str(hunk.get("old_line", "")).strip()
if old_line != str(block.get("old_line", "")).strip():
continue
return block
return None
def _apply_append_end(self, content: str, hunk: dict, block: dict) -> tuple[str, int, str | None]:
new_text = str(hunk.get("new_text", ""))
if not new_text.strip():
return content, 0, "append_end new_text is empty."
changed_lines = self._changed_line_count("", new_text)
block_limit = int(block.get("max_changed_lines", 6) or 6)
if changed_lines > block_limit:
return content, 0, f"append_end is too large: changed={changed_lines} allowed={block_limit}."
base = content.rstrip("\n")
suffix = new_text.strip("\n")
if not suffix:
return content, 0, "append_end resolved to empty suffix."
merged = f"{base}\n\n{suffix}\n" if base else f"{suffix}\n"
return merged, changed_lines, None
def _apply_replace_between(self, content: str, hunk: dict, block: dict) -> tuple[str, int, str | None]:
start_anchor = str(hunk.get("start_anchor", "")).strip()
end_anchor = str(hunk.get("end_anchor", "")).strip()
new_text = str(hunk.get("new_text", ""))
if not start_anchor or not end_anchor:
return content, 0, "replace_between anchors are required."
start_pos = content.find(start_anchor)
if start_pos < 0:
return content, 0, "start_anchor not found in file."
middle_start = start_pos + len(start_anchor)
end_pos = content.find(end_anchor, middle_start)
if end_pos < 0:
return content, 0, "end_anchor not found after start_anchor."
old_segment = content[middle_start:end_pos]
changed_lines = self._changed_line_count(old_segment, new_text)
block_limit = int(block.get("max_changed_lines", 6) or 6)
if changed_lines > block_limit:
return content, 0, f"replace_between is too large: changed={changed_lines} allowed={block_limit}."
merged = content[:middle_start] + new_text + content[end_pos:]
return merged, changed_lines, None
def _apply_replace_line_equals(self, content: str, hunk: dict, block: dict) -> tuple[str, int, str | None]:
old_line = str(hunk.get("old_line", "")).strip()
new_text = str(hunk.get("new_text", ""))
if not old_line:
return content, 0, "replace_line_equals old_line is required."
lines = content.splitlines(keepends=True)
matches = [idx for idx, line in enumerate(lines) if line.rstrip("\n") == old_line]
if len(matches) != 1:
return content, 0, f"replace_line_equals expected exactly one match, got={len(matches)}."
replacement = new_text.rstrip("\n") + "\n"
changed_lines = self._changed_line_count(old_line + "\n", replacement)
block_limit = int(block.get("max_changed_lines", 6) or 6)
if changed_lines > block_limit:
return content, 0, f"replace_line_equals is too large: changed={changed_lines} allowed={block_limit}."
lines[matches[0] : matches[0] + 1] = [replacement]
return "".join(lines), changed_lines, None
def _changed_line_count(self, old_text: str, new_text: str) -> int:
old_lines = (old_text or "").splitlines()
new_lines = (new_text or "").splitlines()
if not old_lines and not new_lines:
return 0
matcher = SequenceMatcher(None, old_lines, new_lines)
changed = 0
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == "equal":
continue
changed += max(i2 - i1, j2 - j1)
return max(changed, 1)

View File

@@ -1,116 +0,0 @@
import json
import re
from difflib import SequenceMatcher
from app.modules.agent.engine.graphs.file_targeting import FileTargeting
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.schemas.changeset import ChangeItem
class ProjectEditsSupport:
def __init__(self, max_context_files: int = 12, max_preview_chars: int = 2500) -> None:
self._max_context_files = max_context_files
self._max_preview_chars = max_preview_chars
self._targeting = FileTargeting()
def pick_relevant_files(self, message: str, files_map: dict[str, dict]) -> list[dict]:
tokens = {x for x in (message or "").lower().replace("/", " ").split() if len(x) >= 4}
scored: list[tuple[int, dict]] = []
for path, payload in files_map.items():
content = str(payload.get("content", ""))
score = 0
low_path = path.lower()
low_content = content.lower()
for token in tokens:
if token in low_path:
score += 3
if token in low_content:
score += 1
scored.append((score, self.as_candidate(payload)))
scored.sort(key=lambda x: (-x[0], x[1]["path"]))
return [item for _, item in scored[: self._max_context_files]]
def as_candidate(self, payload: dict) -> dict:
return {
"path": str(payload.get("path", "")).replace("\\", "/"),
"content": str(payload.get("content", "")),
"content_hash": str(payload.get("content_hash", "")),
}
def normalize_file_output(self, text: str) -> str:
value = (text or "").strip()
if value.startswith("```") and value.endswith("```"):
lines = value.splitlines()
if len(lines) >= 3:
return "\n".join(lines[1:-1]).strip()
return value
def parse_json(self, raw: str):
text = self.normalize_file_output(raw)
try:
return json.loads(text)
except Exception:
return {}
def shorten(self, text: str, max_chars: int | None = None) -> str:
limit = max_chars or self._max_preview_chars
value = (text or "").strip()
if len(value) <= limit:
return value
return value[:limit].rstrip() + "\n...[truncated]"
def collapse_whitespace(self, text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip())
def similarity(self, original: str, updated: str) -> float:
return SequenceMatcher(None, original or "", updated or "").ratio()
def line_change_ratio(self, original: str, updated: str) -> float:
orig_lines = (original or "").splitlines()
new_lines = (updated or "").splitlines()
if not orig_lines and not new_lines:
return 0.0
matcher = SequenceMatcher(None, orig_lines, new_lines)
changed = 0
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == "equal":
continue
changed += max(i2 - i1, j2 - j1)
total = max(len(orig_lines), len(new_lines), 1)
return changed / total
def added_headings(self, original: str, updated: str) -> int:
old_heads = {line.strip() for line in (original or "").splitlines() if line.strip().startswith("#")}
new_heads = {line.strip() for line in (updated or "").splitlines() if line.strip().startswith("#")}
return len(new_heads - old_heads)
def build_summary(self, state: AgentGraphState, changeset: list[ChangeItem]) -> str:
if not changeset:
return "Правки не сформированы: changeset пуст."
lines = [
"Выполненные действия:",
f"- Проанализирован запрос: {state.get('message', '')}",
"- Сформирован контракт правок с разрешенными блоками изменений.",
f"- Проведен self-check: {state.get('validation_feedback', 'без замечаний')}",
"",
"Измененные файлы:",
]
for item in changeset[:30]:
lines.append(f"- {item.op.value} {item.path}: {item.reason}")
return "\n".join(lines)
def is_broad_rewrite_request(self, message: str) -> bool:
low = (message or "").lower()
markers = (
"перепиши",
"полностью",
"целиком",
"с нуля",
"full rewrite",
"rewrite all",
"реорганизуй документ",
)
return any(marker in low for marker in markers)
def lookup_file(self, files_map: dict[str, dict], path: str) -> dict | None:
return self._targeting.lookup_file(files_map, path)

View File

@@ -1,47 +0,0 @@
import logging
from langgraph.graph import END, START, StateGraph
from app.modules.agent.engine.graphs.progress import emit_progress_sync
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.llm import AgentLlmService
LOGGER = logging.getLogger(__name__)
class ProjectQaGraphFactory:
def __init__(self, llm: AgentLlmService) -> None:
self._llm = llm
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("answer", self._answer_node)
graph.add_edge(START, "answer")
graph.add_edge("answer", END)
return graph.compile(checkpointer=checkpointer)
def _answer_node(self, state: AgentGraphState) -> dict:
emit_progress_sync(
state,
stage="graph.project_qa.answer",
message="Готовлю ответ по контексту текущего проекта.",
)
user_input = "\n\n".join(
[
f"User request:\n{state.get('message', '')}",
f"RAG context:\n{state.get('rag_context', '')}",
f"Confluence context:\n{state.get('confluence_context', '')}",
]
)
answer = self._llm.generate("project_answer", user_input, log_context="graph.project_qa.answer")
emit_progress_sync(
state,
stage="graph.project_qa.answer.done",
message="Ответ по проекту сформирован.",
)
result = {"answer": answer}
LOGGER.warning(
"graph step result: graph=project_qa step=answer answer_len=%s",
len(answer or ""),
)
return result

View File

@@ -1,172 +0,0 @@
from __future__ import annotations
import logging
from langgraph.graph import END, START, StateGraph
from app.modules.agent.engine.graphs.progress import emit_progress_sync
from app.modules.agent.engine.graphs.state import AgentGraphState
from app.modules.agent.engine.orchestrator.actions.project_qa_analyzer import ProjectQaAnalyzer
from app.modules.agent.engine.orchestrator.actions.project_qa_support import ProjectQaSupport
from app.modules.agent.llm import AgentLlmService
from app.modules.contracts import RagRetriever
from app.modules.rag.explain import ExplainPack, PromptBudgeter
LOGGER = logging.getLogger(__name__)
class ProjectQaConversationGraphFactory:
def __init__(self, llm: AgentLlmService | None = None) -> None:
self._support = ProjectQaSupport()
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("resolve_request", self._resolve_request)
graph.add_edge(START, "resolve_request")
graph.add_edge("resolve_request", END)
return graph.compile(checkpointer=checkpointer)
def _resolve_request(self, state: AgentGraphState) -> dict:
emit_progress_sync(state, stage="graph.project_qa.conversation_understanding", message="Нормализую пользовательский запрос.")
resolved = self._support.resolve_request(str(state.get("message", "") or ""))
LOGGER.warning("graph step result: graph=project_qa/conversation_understanding normalized=%s", resolved.get("normalized_message", ""))
return {"resolved_request": resolved}
class ProjectQaClassificationGraphFactory:
def __init__(self, llm: AgentLlmService | None = None) -> None:
self._support = ProjectQaSupport()
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("classify_question", self._classify_question)
graph.add_edge(START, "classify_question")
graph.add_edge("classify_question", END)
return graph.compile(checkpointer=checkpointer)
def _classify_question(self, state: AgentGraphState) -> dict:
resolved = state.get("resolved_request", {}) or {}
message = str(resolved.get("normalized_message") or state.get("message", "") or "")
profile = self._support.build_profile(message)
LOGGER.warning("graph step result: graph=project_qa/question_classification domain=%s intent=%s", profile.get("domain"), profile.get("intent"))
return {"question_profile": profile}
class ProjectQaRetrievalGraphFactory:
def __init__(self, rag: RagRetriever, llm: AgentLlmService | None = None) -> None:
self._rag = rag
self._support = ProjectQaSupport()
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("retrieve_context", self._retrieve_context)
graph.add_edge(START, "retrieve_context")
graph.add_edge("retrieve_context", END)
return graph.compile(checkpointer=checkpointer)
def _retrieve_context(self, state: AgentGraphState) -> dict:
emit_progress_sync(state, stage="graph.project_qa.context_retrieval", message="Собираю контекст по проекту.")
resolved = state.get("resolved_request", {}) or {}
profile = state.get("question_profile", {}) or {}
files_map = dict(state.get("files_map", {}) or {})
rag_items: list[dict] = []
source_bundle = self._support.build_source_bundle(profile, list(rag_items), files_map)
LOGGER.warning(
"graph step result: graph=project_qa/context_retrieval mode=%s rag_items=%s file_candidates=%s legacy_rag=%s",
profile.get("domain"),
len(source_bundle.get("rag_items", []) or []),
len(source_bundle.get("file_candidates", []) or []),
False,
)
return {"source_bundle": source_bundle}
class ProjectQaAnalysisGraphFactory:
def __init__(self, llm: AgentLlmService | None = None) -> None:
self._support = ProjectQaSupport()
self._analyzer = ProjectQaAnalyzer()
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("analyze_context", self._analyze_context)
graph.add_edge(START, "analyze_context")
graph.add_edge("analyze_context", END)
return graph.compile(checkpointer=checkpointer)
def _analyze_context(self, state: AgentGraphState) -> dict:
explain_pack = state.get("explain_pack")
if explain_pack:
analysis = self._analysis_from_pack(explain_pack)
LOGGER.warning(
"graph step result: graph=project_qa/context_analysis findings=%s evidence=%s",
len(analysis.get("findings", []) or []),
len(analysis.get("evidence", []) or []),
)
return {"analysis_brief": analysis}
bundle = state.get("source_bundle", {}) or {}
profile = bundle.get("profile", {}) or state.get("question_profile", {}) or {}
rag_items = list(bundle.get("rag_items", []) or [])
file_candidates = list(bundle.get("file_candidates", []) or [])
analysis = self._analyzer.analyze_code(profile, rag_items, file_candidates) if str(profile.get("domain")) == "code" else self._analyzer.analyze_docs(profile, rag_items)
LOGGER.warning(
"graph step result: graph=project_qa/context_analysis findings=%s evidence=%s",
len(analysis.get("findings", []) or []),
len(analysis.get("evidence", []) or []),
)
return {"analysis_brief": analysis}
def _analysis_from_pack(self, raw_pack) -> dict:
pack = ExplainPack.model_validate(raw_pack)
findings: list[str] = []
evidence: list[str] = []
for entrypoint in pack.selected_entrypoints[:3]:
findings.append(f"Entrypoint `{entrypoint.title}` maps to handler `{entrypoint.metadata.get('handler_symbol_id', '')}`.")
if entrypoint.source:
evidence.append(entrypoint.source)
for path in pack.trace_paths[:3]:
if path.symbol_ids:
findings.append(f"Trace path: {' -> '.join(path.symbol_ids)}")
for excerpt in pack.code_excerpts[:4]:
evidence.append(f"{excerpt.path}:{excerpt.start_line}-{excerpt.end_line} [{excerpt.evidence_id}]")
return {
"subject": pack.intent.normalized_query,
"findings": findings or ["No explain trace was built from the available code evidence."],
"evidence": evidence,
"gaps": list(pack.missing),
"answer_mode": "summary",
}
class ProjectQaAnswerGraphFactory:
def __init__(self, llm: AgentLlmService | None = None) -> None:
self._support = ProjectQaSupport()
self._llm = llm
self._budgeter = PromptBudgeter()
def build(self, checkpointer=None):
graph = StateGraph(AgentGraphState)
graph.add_node("compose_answer", self._compose_answer)
graph.add_edge(START, "compose_answer")
graph.add_edge("compose_answer", END)
return graph.compile(checkpointer=checkpointer)
def _compose_answer(self, state: AgentGraphState) -> dict:
profile = state.get("question_profile", {}) or {}
analysis = state.get("analysis_brief", {}) or {}
brief = self._support.build_answer_brief(profile, analysis)
explain_pack = state.get("explain_pack")
answer = self._compose_explain_answer(state, explain_pack)
if not answer:
answer = self._support.compose_answer(brief)
LOGGER.warning("graph step result: graph=project_qa/answer_composition answer_len=%s", len(answer or ""))
return {"answer_brief": brief, "final_answer": answer}
def _compose_explain_answer(self, state: AgentGraphState, raw_pack) -> str:
if raw_pack is None or self._llm is None:
return ""
pack = ExplainPack.model_validate(raw_pack)
prompt_input = self._budgeter.build_prompt_input(str(state.get("message", "") or ""), pack)
return self._llm.generate(
"code_explain_answer_v2",
prompt_input,
log_context="graph.project_qa.answer_v2",
).strip()

View File

@@ -1,40 +0,0 @@
from typing import TypedDict
from app.schemas.changeset import ChangeItem
class AgentGraphState(TypedDict, total=False):
task_id: str
project_id: str
message: str
progress_key: str
rag_context: str
confluence_context: str
files_map: dict[str, dict]
docs_candidates: list[dict]
target_path: str
target_file_content: str
target_file_hash: str
existing_docs_detected: bool
existing_docs_summary: str
docs_strategy: str
rules_bundle: str
doc_plan: str
generated_doc: str
generated_docs_bundle: list[dict]
validation_passed: bool
validation_feedback: str
validation_attempts: int
resolved_request: dict
question_profile: dict
source_bundle: dict
analysis_brief: dict
answer_brief: dict
final_answer: str
answer: str
changeset: list[ChangeItem]
edits_requested_path: str
edits_context_files: list[dict]
edits_plan: list[dict]
edits_contracts: list[dict]
edits_generation_feedback: str

View File

@@ -1,21 +0,0 @@
from app.modules.agent.engine.orchestrator.models import (
ExecutionPlan,
OrchestratorResult,
PlanStep,
Scenario,
StepResult,
TaskSpec,
)
from app.modules.agent.engine.orchestrator.service import OrchestratorService
from app.modules.agent.engine.orchestrator.task_spec_builder import TaskSpecBuilder
__all__ = [
"ExecutionPlan",
"OrchestratorResult",
"OrchestratorService",
"PlanStep",
"Scenario",
"StepResult",
"TaskSpec",
"TaskSpecBuilder",
]

View File

@@ -1,17 +0,0 @@
from app.modules.agent.engine.orchestrator.actions.code_explain_actions import CodeExplainActions
from app.modules.agent.engine.orchestrator.actions.docs_actions import DocsActions
from app.modules.agent.engine.orchestrator.actions.edit_actions import EditActions
from app.modules.agent.engine.orchestrator.actions.explain_actions import ExplainActions
from app.modules.agent.engine.orchestrator.actions.gherkin_actions import GherkinActions
from app.modules.agent.engine.orchestrator.actions.project_qa_actions import ProjectQaActions
from app.modules.agent.engine.orchestrator.actions.review_actions import ReviewActions
__all__ = [
"CodeExplainActions",
"DocsActions",
"EditActions",
"ExplainActions",
"GherkinActions",
"ProjectQaActions",
"ReviewActions",
]

View File

@@ -1,46 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
from app.modules.rag.explain.intent_builder import ExplainIntentBuilder
from app.modules.rag.explain.models import ExplainPack
if TYPE_CHECKING:
from app.modules.rag.explain.retriever_v2 import CodeExplainRetrieverV2
LOGGER = logging.getLogger(__name__)
class CodeExplainActions(ActionSupport):
def __init__(self, retriever: CodeExplainRetrieverV2 | None = None) -> None:
self._retriever = retriever
self._intent_builder = ExplainIntentBuilder()
def build_code_explain_pack(self, ctx: ExecutionContext) -> list[str]:
file_candidates = list((self.get(ctx, "source_bundle", {}) or {}).get("file_candidates", []) or [])
if self._retriever is None:
pack = ExplainPack(
intent=self._intent_builder.build(ctx.task.user_message),
missing=["code_explain_retriever_unavailable"],
)
else:
pack = self._retriever.build_pack(
ctx.task.rag_session_id,
ctx.task.user_message,
file_candidates=file_candidates,
)
LOGGER.warning(
"code explain action: task_id=%s entrypoints=%s seeds=%s paths=%s excerpts=%s missing=%s",
ctx.task.task_id,
len(pack.selected_entrypoints),
len(pack.seed_symbols),
len(pack.trace_paths),
len(pack.code_excerpts),
pack.missing,
)
return [self.put(ctx, "explain_pack", ArtifactType.STRUCTURED_JSON, pack.model_dump(mode="json"))]

View File

@@ -1,26 +0,0 @@
from __future__ import annotations
from uuid import uuid4
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType, EvidenceItem
class ActionSupport:
def put(self, ctx: ExecutionContext, key: str, artifact_type: ArtifactType, value, *, meta: dict | None = None) -> str:
item = ctx.artifacts.put(key=key, artifact_type=artifact_type, content=value, meta=meta)
return item.artifact_id
def get(self, ctx: ExecutionContext, key: str, default=None):
return ctx.artifacts.get_content(key, default)
def add_evidence(self, ctx: ExecutionContext, *, source_type: str, source_ref: str, snippet: str, score: float = 0.8) -> str:
evidence = EvidenceItem(
evidence_id=f"evidence_{uuid4().hex}",
source_type=source_type,
source_ref=source_ref,
snippet=(snippet or "").strip()[:600],
score=max(0.0, min(1.0, float(score))),
)
ctx.evidences.put_many([evidence])
return evidence.evidence_id

View File

@@ -1,95 +0,0 @@
from __future__ import annotations
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class DocsActions(ActionSupport):
def extract_change_intents(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or ctx.task.user_message)
intents = {
"summary": text[:240],
"api": ["Update endpoint behavior contract"],
"logic": ["Adjust reusable business rules"],
"db": ["Reflect schema/table notes if needed"],
"ui": ["Adjust form behavior and validation"],
}
return [self.put(ctx, "change_intents", ArtifactType.STRUCTURED_JSON, intents)]
def map_to_doc_tree(self, ctx: ExecutionContext) -> list[str]:
targets = [
"docs/api/increment.md",
"docs/logic/increment.md",
"docs/db/increment.md",
"docs/ui/increment.md",
]
return [self.put(ctx, "doc_targets", ArtifactType.STRUCTURED_JSON, {"targets": targets})]
def load_current_docs_context(self, ctx: ExecutionContext) -> list[str]:
files_map = dict(ctx.task.metadata.get("files_map", {}) or {})
targets = (self.get(ctx, "doc_targets", {}) or {}).get("targets", [])
current = []
for path in targets:
current.append(
{
"path": path,
"content": str((files_map.get(path) or {}).get("content", "")),
"content_hash": str((files_map.get(path) or {}).get("content_hash", "")),
}
)
return [self.put(ctx, "current_docs_context", ArtifactType.STRUCTURED_JSON, {"files": current})]
def generate_doc_updates(self, ctx: ExecutionContext) -> list[str]:
intents = self.get(ctx, "change_intents", {}) or {}
targets = (self.get(ctx, "doc_targets", {}) or {}).get("targets", [])
bundle = []
for path in targets:
bundle.append(
{
"path": path,
"content": "\n".join(
[
f"# Increment Update: {path}",
"",
"## Scope",
str(intents.get("summary", "")),
"",
"## Changes",
"- Updated according to analytics increment.",
]
),
"reason": "align docs with analytics increment",
}
)
return [self.put(ctx, "generated_doc_bundle", ArtifactType.DOC_BUNDLE, bundle)]
def cross_file_validation(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "generated_doc_bundle", []) or []
paths = [str(item.get("path", "")) for item in bundle if isinstance(item, dict)]
has_required = any(path.startswith("docs/api/") for path in paths) and any(path.startswith("docs/logic/") for path in paths)
report = {"paths": paths, "required_core_paths_present": has_required}
return [self.put(ctx, "consistency_report", ArtifactType.STRUCTURED_JSON, report)]
def build_changeset(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "generated_doc_bundle", []) or []
changeset = []
for item in bundle:
if not isinstance(item, dict):
continue
changeset.append(
{
"op": "update",
"path": str(item.get("path", "")).strip(),
"base_hash": "orchestrator-generated",
"proposed_content": str(item.get("content", "")),
"reason": str(item.get("reason", "documentation update")),
"hunks": [],
}
)
return [self.put(ctx, "final_changeset", ArtifactType.CHANGESET, changeset)]
def compose_summary(self, ctx: ExecutionContext) -> list[str]:
count = len(self.get(ctx, "final_changeset", []) or [])
text = f"Prepared documentation changeset with {count} files updated."
return [self.put(ctx, "final_answer", ArtifactType.TEXT, text)]

View File

@@ -1,101 +0,0 @@
from __future__ import annotations
import re
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class EditActions(ActionSupport):
def resolve_target(self, ctx: ExecutionContext) -> list[str]:
message = ctx.task.user_message
files_map = dict(ctx.task.metadata.get("files_map", {}) or {})
requested = self._extract_path(message)
matched = self._lookup_source(files_map, requested)
if matched:
requested = str(matched.get("path") or requested or "")
if not requested and files_map:
requested = next(iter(files_map.keys()))
payload = {"path": requested or "", "allowed": bool(requested)}
return [self.put(ctx, "resolved_target", ArtifactType.STRUCTURED_JSON, payload)]
def load_target_context(self, ctx: ExecutionContext) -> list[str]:
files_map = dict(ctx.task.metadata.get("files_map", {}) or {})
resolved = self.get(ctx, "resolved_target", {}) or {}
path = str(resolved.get("path", ""))
source = dict(self._lookup_source(files_map, path) or {})
current = {
"path": str(source.get("path", "")) or path,
"content": str(source.get("content", "")),
"content_hash": str(source.get("content_hash", "")),
}
return [self.put(ctx, "target_context", ArtifactType.STRUCTURED_JSON, current)]
def plan_minimal_patch(self, ctx: ExecutionContext) -> list[str]:
target = self.get(ctx, "target_context", {}) or {}
plan = {
"path": target.get("path", ""),
"intent": "minimal_update",
"instruction": ctx.task.user_message[:240],
}
return [self.put(ctx, "patch_plan", ArtifactType.STRUCTURED_JSON, plan)]
def generate_patch(self, ctx: ExecutionContext) -> list[str]:
target = self.get(ctx, "target_context", {}) or {}
plan = self.get(ctx, "patch_plan", {}) or {}
path = str(target.get("path", ""))
base = str(target.get("content_hash", "") or "orchestrator-generated")
original = str(target.get("content", ""))
note = f"\n\n<!-- orchestrator note: {plan.get('instruction', '')[:100]} -->\n"
proposed = (original + note).strip() if original else note.strip()
changeset = [
{
"op": "update" if original else "create",
"path": path,
"base_hash": base if original else None,
"proposed_content": proposed,
"reason": "targeted file update",
"hunks": [],
}
]
return [self.put(ctx, "raw_changeset", ArtifactType.CHANGESET, changeset)]
def validate_patch_safety(self, ctx: ExecutionContext) -> list[str]:
changeset = self.get(ctx, "raw_changeset", []) or []
safe = len(changeset) == 1
report = {"safe": safe, "items": len(changeset), "reason": "single-file patch expected"}
return [self.put(ctx, "patch_validation_report", ArtifactType.STRUCTURED_JSON, report)]
def finalize_changeset(self, ctx: ExecutionContext) -> list[str]:
report = self.get(ctx, "patch_validation_report", {}) or {}
if not report.get("safe"):
return [self.put(ctx, "final_changeset", ArtifactType.CHANGESET, [])]
changeset = self.get(ctx, "raw_changeset", []) or []
return [self.put(ctx, "final_changeset", ArtifactType.CHANGESET, changeset)]
def compose_edit_summary(self, ctx: ExecutionContext) -> list[str]:
count = len(self.get(ctx, "final_changeset", []) or [])
text = f"Prepared targeted edit changeset with {count} item(s)."
return [self.put(ctx, "final_answer", ArtifactType.TEXT, text)]
def _extract_path(self, text: str) -> str | None:
match = re.search(r"\b[\w./-]+\.(md|txt|rst|yaml|yml|json|toml|ini|cfg)\b", text or "", flags=re.IGNORECASE)
if not match:
return None
return match.group(0).replace("\\", "/").strip()
def _lookup_source(self, files_map: dict[str, dict], path: str | None) -> dict | None:
if not path:
return None
normalized = str(path).replace("\\", "/").strip()
if not normalized:
return None
source = files_map.get(normalized)
if source:
return source
normalized_low = normalized.lower()
for key, value in files_map.items():
if str(key).replace("\\", "/").lower() == normalized_low:
return value
return None

View File

@@ -1,259 +0,0 @@
from __future__ import annotations
from collections import Counter
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class ExplainActions(ActionSupport):
def collect_sources(self, ctx: ExecutionContext) -> list[str]:
rag_items = list(ctx.task.metadata.get("rag_items", []) or [])
rag_context = str(ctx.task.metadata.get("rag_context", ""))
confluence_context = str(ctx.task.metadata.get("confluence_context", ""))
files_map = dict(ctx.task.metadata.get("files_map", {}) or {})
payload = {
"rag_items": rag_items,
"rag_context": rag_context,
"confluence_context": confluence_context,
"files_count": len(files_map),
"source_profile": self._source_profile(rag_items),
}
evidence_ids: list[str] = []
for item in rag_items[:5]:
snippet = str(item.get("content", "") or "").strip()
if not snippet:
continue
evidence_ids.append(
self.add_evidence(
ctx,
source_type="rag_chunk",
source_ref=str(item.get("source", ctx.task.rag_session_id)),
snippet=snippet,
score=0.9,
)
)
artifact_id = self.put(
ctx,
"sources",
ArtifactType.STRUCTURED_JSON,
payload,
meta={"evidence_ids": evidence_ids},
)
return [artifact_id]
def extract_logic(self, ctx: ExecutionContext) -> list[str]:
sources = self.get(ctx, "sources", {}) or {}
message = ctx.task.user_message
profile = str(sources.get("source_profile", "docs"))
ru = self._is_russian(message)
notes = (
"Используй код как основной источник и ссылайся на конкретные файлы и слои."
if profile == "code" and ru
else "Use code as the primary source and cite concrete files/layers."
if profile == "code"
else "Используй требования и документацию как основной источник."
if ru
else "Use requirements/docs as primary source over code."
)
logic = {
"request": message,
"assumptions": [f"{profile}-first"],
"notes": notes,
"source_summary": sources,
}
return [self.put(ctx, "logic_model", ArtifactType.STRUCTURED_JSON, logic)]
def summarize(self, ctx: ExecutionContext) -> list[str]:
sources = self.get(ctx, "sources", {}) or {}
profile = str(sources.get("source_profile", "docs"))
items = list(sources.get("rag_items", []) or [])
message = ctx.task.user_message
ru = self._is_russian(message)
answer = self._code_answer(items, russian=ru) if profile == "code" else self._docs_answer(items, russian=ru)
return [self.put(ctx, "final_answer", ArtifactType.TEXT, answer)]
def _source_profile(self, items: list[dict]) -> str:
layers = [str(item.get("layer", "") or "") for item in items]
if any(layer.startswith("C") for layer in layers):
return "code"
return "docs"
def _is_russian(self, text: str) -> bool:
return any("а" <= ch.lower() <= "я" or ch.lower() == "ё" for ch in text)
def _code_answer(self, items: list[dict], *, russian: bool) -> str:
if not items:
return (
"Не удалось найти релевантный кодовый контекст по этому запросу."
if russian
else "No relevant code context was found for this request."
)
details = self._code_details(items, russian=russian)
refs = self._code_references(items, russian=russian)
parts = [
"## Кратко" if russian else "## Summary",
details,
]
if refs:
parts.append(refs)
return "\n\n".join(part for part in parts if part.strip())
def _docs_answer(self, items: list[dict], *, russian: bool) -> str:
return (
"Запрошенная часть проекта объяснена на основе требований и документации."
if russian
else "The requested project part is explained from requirements/docs context."
)
def _code_details(self, items: list[dict], *, russian: bool) -> str:
if not items:
return ""
symbol_items = [item for item in items if str(item.get("layer", "")) == "C1_SYMBOL_CATALOG"]
edge_items = [item for item in items if str(item.get("layer", "")) == "C2_DEPENDENCY_GRAPH"]
source_items = [item for item in items if str(item.get("layer", "")) == "C0_SOURCE_CHUNKS"]
lines = ["### Что видно по коду" if russian else "### What the code shows"]
alias = self._find_alias_symbol(symbol_items)
if alias:
imported_from = str(alias.get("metadata", {}).get("lang_payload", {}).get("imported_from", "")).strip()
if russian:
lines.append(f"- `ConfigManager` в проекте доступен как alias в `{alias.get('source', '')}` и указывает на `{imported_from}`.")
else:
lines.append(f"- `ConfigManager` is exposed as an alias in `{alias.get('source', '')}` and points to `{imported_from}`.")
management_hint = self._management_summary(symbol_items, edge_items, source_items, russian=russian)
if management_hint:
lines.extend(management_hint)
symbol_lines = 0
for item in symbol_items[:4]:
title = str(item.get("title", "") or "")
source = str(item.get("source", "") or "")
content = str(item.get("content", "") or "").strip()
summary = content.splitlines()[-1].strip() if content else ""
if not title:
continue
if self._is_test_path(source):
continue
if self._is_control_symbol(title):
continue
if russian:
lines.append(f"- Символ `{title}` из `{source}`: {summary}")
else:
lines.append(f"- Symbol `{title}` from `{source}`: {summary}")
symbol_lines += 1
if symbol_lines >= 2:
break
edge_map: dict[str, list[str]] = {}
for item in edge_items:
meta = item.get("metadata", {}) or {}
src_qname = str(meta.get("src_qname", "") or "").strip()
dst_ref = str(meta.get("dst_ref", "") or "").strip()
if not src_qname or not dst_ref:
continue
if self._is_test_path(str(item.get("source", "") or "")):
continue
edge_map.setdefault(src_qname, [])
if dst_ref not in edge_map[src_qname]:
edge_map[src_qname].append(dst_ref)
for src_qname, targets in list(edge_map.items())[:3]:
joined = ", ".join(targets[:4])
if russian:
lines.append(f"- `{src_qname}` вызывает или использует: {joined}.")
else:
lines.append(f"- `{src_qname}` calls or uses: {joined}.")
for item in source_items[:2]:
source = str(item.get("source", "") or "")
content = str(item.get("content", "") or "")
if self._is_test_path(source):
continue
if "management" in content.lower() or "control" in content.lower():
snippet = " ".join(content.splitlines()[:4]).strip()
if russian:
lines.append(f"- В `{source}` есть прямое указание на управление через конфиг/API: `{snippet[:220]}`")
else:
lines.append(f"- `{source}` directly mentions config/API control: `{snippet[:220]}`")
return "\n".join(lines)
def _code_references(self, items: list[dict], *, russian: bool) -> str:
paths = [str(item.get("source", "") or "") for item in items if item.get("source") and not self._is_test_path(str(item.get("source", "") or ""))]
if not paths:
return ""
lines = ["### Где смотреть в проекте" if russian else "### Where to look in the project"]
for path, _count in Counter(paths).most_common(3):
lines.append(f"- `{path}`")
return "\n".join(lines)
def _find_alias_symbol(self, items: list[dict]) -> dict | None:
for item in items:
meta = item.get("metadata", {}) or {}
payload = meta.get("lang_payload", {}) or {}
qname = str(meta.get("qname", "") or "")
if qname == "ConfigManager" and payload.get("import_alias"):
return item
return None
def _is_test_path(self, path: str) -> bool:
lowered = path.lower()
return lowered.startswith("tests/") or "/tests/" in lowered or lowered.startswith("test_") or "/test_" in lowered
def _is_control_symbol(self, title: str) -> bool:
lowered = title.lower()
return any(token in lowered for token in ("controlchannel", "controlchannelbridge", "on_start", "on_stop", "on_status"))
def _management_summary(
self,
symbol_items: list[dict],
edge_items: list[dict],
source_items: list[dict],
*,
russian: bool,
) -> list[str]:
qnames = {str((item.get("metadata", {}) or {}).get("qname", "") or ""): item for item in symbol_items if not self._is_test_path(str(item.get("source", "") or ""))}
source_texts = [str(item.get("content", "") or "") for item in source_items if not self._is_test_path(str(item.get("source", "") or ""))]
result: list[str] = []
if any("управление через api" in text.lower() or "section management" in text.lower() or "секция management" in text.lower() for text in source_texts):
result.append(
"- Для `ConfigManager` в коде предусмотрен отдельный интерфейс управления через API/конфиг: это прямо указано в публичной точке входа модуля."
if russian
else "- `ConfigManager` has a dedicated API/config-based management interface; this is stated in the module's public entrypoint."
)
has_control_channel = "ControlChannel" in qnames
has_bridge = "ControlChannelBridge" in qnames
if has_control_channel:
result.append(
"- Базовый контракт управления задает `ControlChannel`: он определяет команды `start` и `stop` для внешнего канала управления."
if russian
else "- The base management contract is `ControlChannel`, which defines external `start` and `stop` commands."
)
if has_bridge:
result.append(
"- `ControlChannelBridge` связывает внешний канал управления с lifecycle-методами менеджера: `on_start`, `on_stop`, `on_status`."
if russian
else "- `ControlChannelBridge` maps the external control channel to manager lifecycle methods: `on_start`, `on_stop`, `on_status`."
)
edge_refs = []
for item in edge_items:
if self._is_test_path(str(item.get("source", "") or "")):
continue
meta = item.get("metadata", {}) or {}
src = str(meta.get("src_qname", "") or "")
dst = str(meta.get("dst_ref", "") or "")
if src.startswith("ControlChannelBridge.") and dst in {"self._start_runtime", "self._stop_runtime", "self._get_status"}:
edge_refs.append((src, dst))
if edge_refs:
mappings = ", ".join(f"{src} -> {dst}" for src, dst in edge_refs[:3])
result.append(
f"- По связям в коде видно, что команды управления маршрутизируются так: {mappings}."
if russian
else f"- The code relationships show the management command routing: {mappings}."
)
return result

View File

@@ -1,76 +0,0 @@
from __future__ import annotations
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class GherkinActions(ActionSupport):
def extract_increment_scope(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or ctx.task.user_message)
scope = {
"title": "Increment scope",
"summary": text[:220],
"entities": ["User", "System"],
}
return [self.put(ctx, "increment_scope", ArtifactType.STRUCTURED_JSON, scope)]
def partition_features(self, ctx: ExecutionContext) -> list[str]:
scope = self.get(ctx, "increment_scope", {}) or {}
groups = [
{"feature": "Main flow", "goal": scope.get("summary", "")},
{"feature": "Validation", "goal": "Input validation and error behavior"},
]
return [self.put(ctx, "feature_groups", ArtifactType.STRUCTURED_JSON, groups)]
def generate_gherkin_bundle(self, ctx: ExecutionContext) -> list[str]:
groups = self.get(ctx, "feature_groups", []) or []
files = []
for idx, group in enumerate(groups, start=1):
feature_name = str(group.get("feature", f"Feature {idx}"))
content = "\n".join(
[
f"Feature: {feature_name}",
" Scenario: Happy path",
" Given system is available",
" When user performs increment action",
" Then system applies expected increment behavior",
]
)
files.append({"path": f"tests/gherkin/feature_{idx}.feature", "content": content})
return [self.put(ctx, "gherkin_bundle", ArtifactType.GHERKIN_BUNDLE, files)]
def lint_gherkin(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "gherkin_bundle", []) or []
invalid = []
for item in bundle:
content = str(item.get("content", "")) if isinstance(item, dict) else ""
if "Feature:" not in content or "Scenario:" not in content:
invalid.append(str(item.get("path", "unknown")))
report = {"valid": len(invalid) == 0, "invalid_files": invalid}
return [self.put(ctx, "gherkin_lint_report", ArtifactType.STRUCTURED_JSON, report)]
def validate_coverage(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "gherkin_bundle", []) or []
report = {"covered": len(bundle) > 0, "feature_files": len(bundle)}
return [self.put(ctx, "coverage_report", ArtifactType.STRUCTURED_JSON, report)]
def compose_test_model_summary(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "gherkin_bundle", []) or []
summary = f"Prepared gherkin model with {len(bundle)} feature file(s)."
changeset = [
{
"op": "create",
"path": str(item.get("path", "")),
"base_hash": None,
"proposed_content": str(item.get("content", "")),
"reason": "generated gherkin feature",
"hunks": [],
}
for item in bundle
if isinstance(item, dict)
]
return [
self.put(ctx, "final_answer", ArtifactType.TEXT, summary),
self.put(ctx, "final_changeset", ArtifactType.CHANGESET, changeset),
]

View File

@@ -1,117 +0,0 @@
from __future__ import annotations
from app.modules.agent.engine.orchestrator.actions.project_qa_analyzer import ProjectQaAnalyzer
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.actions.project_qa_support import ProjectQaSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class ProjectQaActions(ActionSupport):
def __init__(self) -> None:
self._support = ProjectQaSupport()
self._analyzer = ProjectQaAnalyzer()
def classify_project_question(self, ctx: ExecutionContext) -> list[str]:
message = str(ctx.task.user_message or "")
profile = self._support.build_profile(message)
return [self.put(ctx, "question_profile", ArtifactType.STRUCTURED_JSON, profile)]
def collect_project_sources(self, ctx: ExecutionContext) -> list[str]:
profile = self.get(ctx, "question_profile", {}) or {}
terms = list(profile.get("terms", []) or [])
entities = list(profile.get("entities", []) or [])
rag_items = list(ctx.task.metadata.get("rag_items", []) or [])
files_map = dict(ctx.task.metadata.get("files_map", {}) or {})
explicit_test = any(term in {"test", "tests", "тест", "тесты"} for term in terms)
ranked_rag = []
for item in rag_items:
score = self._support.rag_score(item, terms, entities)
source = str(item.get("source", "") or "")
if not explicit_test and self._support.is_test_path(source):
score -= 3
if score > 0:
ranked_rag.append((score, item))
ranked_rag.sort(key=lambda pair: pair[0], reverse=True)
ranked_files = []
for path, payload in files_map.items():
score = self._support.file_score(path, payload, terms, entities)
if not explicit_test and self._support.is_test_path(path):
score -= 3
if score > 0:
ranked_files.append(
(
score,
{
"path": path,
"content": str(payload.get("content", "")),
"content_hash": str(payload.get("content_hash", "")),
},
)
)
ranked_files.sort(key=lambda pair: pair[0], reverse=True)
bundle = {
"profile": profile,
"rag_items": [item for _, item in ranked_rag[:12]],
"file_candidates": [item for _, item in ranked_files[:10]],
"rag_total": len(ranked_rag),
"files_total": len(ranked_files),
}
return [self.put(ctx, "source_bundle", ArtifactType.STRUCTURED_JSON, bundle)]
def analyze_project_sources(self, ctx: ExecutionContext) -> list[str]:
bundle = self.get(ctx, "source_bundle", {}) or {}
profile = bundle.get("profile", {}) or {}
rag_items = list(bundle.get("rag_items", []) or [])
file_candidates = list(bundle.get("file_candidates", []) or [])
if str(profile.get("domain")) == "code":
analysis = self._analyzer.analyze_code(profile, rag_items, file_candidates)
else:
analysis = self._analyzer.analyze_docs(profile, rag_items)
return [self.put(ctx, "analysis_brief", ArtifactType.STRUCTURED_JSON, analysis)]
def build_project_answer_brief(self, ctx: ExecutionContext) -> list[str]:
profile = self.get(ctx, "question_profile", {}) or {}
analysis = self.get(ctx, "analysis_brief", {}) or {}
brief = {
"question_profile": profile,
"resolved_subject": analysis.get("subject"),
"key_findings": analysis.get("findings", []),
"supporting_evidence": analysis.get("evidence", []),
"missing_evidence": analysis.get("gaps", []),
"answer_mode": analysis.get("answer_mode", "summary"),
}
return [self.put(ctx, "answer_brief", ArtifactType.STRUCTURED_JSON, brief)]
def compose_project_answer(self, ctx: ExecutionContext) -> list[str]:
brief = self.get(ctx, "answer_brief", {}) or {}
profile = brief.get("question_profile", {}) or {}
russian = bool(profile.get("russian"))
answer_mode = str(brief.get("answer_mode") or "summary")
findings = list(brief.get("key_findings", []) or [])
evidence = list(brief.get("supporting_evidence", []) or [])
gaps = list(brief.get("missing_evidence", []) or [])
title = "## Кратко" if russian else "## Summary"
lines = [title]
if answer_mode == "inventory":
lines.append("### Что реализовано" if russian else "### Implemented items")
else:
lines.append("### Что видно по проекту" if russian else "### What the project shows")
if findings:
lines.extend(f"- {item}" for item in findings)
else:
lines.append("Не удалось собрать подтвержденные выводы по доступным данным." if russian else "No supported findings could be assembled from the available data.")
if evidence:
lines.append("")
lines.append("### Где смотреть в проекте" if russian else "### Where to look in the project")
lines.extend(f"- `{item}`" for item in evidence[:5])
if gaps:
lines.append("")
lines.append("### Что пока не подтверждено кодом" if russian else "### What is not yet confirmed in code")
lines.extend(f"- {item}" for item in gaps[:3])
return [self.put(ctx, "final_answer", ArtifactType.TEXT, "\n".join(lines))]

View File

@@ -1,154 +0,0 @@
from __future__ import annotations
class ProjectQaAnalyzer:
def analyze_code(self, profile: dict, rag_items: list[dict], file_candidates: list[dict]) -> dict:
terms = list(profile.get("terms", []) or [])
intent = str(profile.get("intent") or "lookup")
russian = bool(profile.get("russian"))
findings: list[str] = []
evidence: list[str] = []
gaps: list[str] = []
symbol_titles = [str(item.get("title", "") or "") for item in rag_items if str(item.get("layer", "")).startswith("C1")]
symbol_set = set(symbol_titles)
file_paths = [str(item.get("path", "") or item.get("source", "") or "") for item in rag_items]
file_paths.extend(str(item.get("path", "") or "") for item in file_candidates)
if "ConfigManager" in profile.get("entities", []) or "configmanager" in terms or "config_manager" in terms:
alias_file = self.find_path(file_paths, "src/config_manager/__init__.py")
if alias_file:
findings.append(
"Публичный `ConfigManager` экспортируется из `src/config_manager/__init__.py` как alias на `ConfigManagerV2`."
if russian
else "Public `ConfigManager` is exported from `src/config_manager/__init__.py` as an alias to `ConfigManagerV2`."
)
evidence.append("src/config_manager/__init__.py")
if "controlchannel" in {name.lower() for name in symbol_set}:
findings.append(
"Базовый контракт управления задает `ControlChannel`: он определяет команды `start` и `stop` для внешнего канала управления."
if russian
else "`ControlChannel` defines the base management contract with `start` and `stop` commands."
)
evidence.append("src/config_manager/v2/control/base.py")
if "ControlChannelBridge" in symbol_set:
findings.append(
"`ControlChannelBridge` связывает внешний канал управления с lifecycle-методами менеджера: `on_start`, `on_stop`, `on_status`."
if russian
else "`ControlChannelBridge` connects the external control channel to manager lifecycle methods: `on_start`, `on_stop`, `on_status`."
)
evidence.append("src/config_manager/v2/core/control_bridge.py")
implementation_files = self.find_management_implementations(file_candidates)
if implementation_files:
labels = ", ".join(f"`{path}`" for path in implementation_files)
channel_names = self.implementation_names(implementation_files)
findings.append(
f"В коде найдены конкретные реализации каналов управления: {', '.join(channel_names)} ({labels})."
if russian
else f"Concrete management channel implementations were found in code: {', '.join(channel_names)} ({labels})."
)
evidence.extend(implementation_files)
elif intent == "inventory":
gaps.append(
"В текущем контексте не удалось уверенно подтвердить конкретные файлы-реализации каналов, кроме базового контракта и bridge-слоя."
if russian
else "The current context does not yet confirm concrete channel implementation files beyond the base contract and bridge layer."
)
package_doc = self.find_management_doc(file_candidates)
if package_doc:
findings.append(
f"Пакет управления прямо описывает внешние каналы через `{package_doc}`."
if russian
else f"The control package directly describes external channels in `{package_doc}`."
)
evidence.append(package_doc)
subject = "management channels"
if profile.get("entities"):
subject = ", ".join(profile["entities"])
return {
"subject": subject,
"findings": self.dedupe(findings),
"evidence": self.dedupe(evidence),
"gaps": gaps,
"answer_mode": "inventory" if intent == "inventory" else "summary",
}
def analyze_docs(self, profile: dict, rag_items: list[dict]) -> dict:
findings: list[str] = []
evidence: list[str] = []
for item in rag_items[:5]:
title = str(item.get("title", "") or "")
source = str(item.get("source", "") or "")
content = str(item.get("content", "") or "").strip()
if content:
findings.append(content.splitlines()[0][:220])
if source:
evidence.append(source)
elif title:
evidence.append(title)
return {
"subject": "docs",
"findings": self.dedupe(findings),
"evidence": self.dedupe(evidence),
"gaps": [] if findings else ["Недостаточно данных в документации." if profile.get("russian") else "Not enough data in documentation."],
"answer_mode": "summary",
}
def find_management_implementations(self, file_candidates: list[dict]) -> list[str]:
found: list[str] = []
for item in file_candidates:
path = str(item.get("path", "") or "")
lowered = path.lower()
if self.is_test_path(path):
continue
if any(token in lowered for token in ("http_channel.py", "telegram.py", "telegram_channel.py", "http.py")):
found.append(path)
continue
content = str(item.get("content", "") or "").lower()
if "controlchannel" in content and "class " in content:
found.append(path)
continue
if ("channel" in lowered or "control" in lowered) and any(token in content for token in ("http", "telegram", "bot")):
found.append(path)
return self.dedupe(found)[:4]
def implementation_names(self, paths: list[str]) -> list[str]:
names: list[str] = []
for path in paths:
stem = path.rsplit("/", 1)[-1].rsplit(".", 1)[0]
label = stem.replace("_", " ").strip()
if label and label not in names:
names.append(label)
return names
def find_management_doc(self, file_candidates: list[dict]) -> str | None:
for item in file_candidates:
path = str(item.get("path", "") or "")
if self.is_test_path(path):
continue
content = str(item.get("content", "") or "").lower()
if any(token in content for token in ("каналы внешнего управления", "external control channels", "http api", "telegram")):
return path
return None
def find_path(self, paths: list[str], target: str) -> str | None:
for path in paths:
if path == target:
return path
return None
def dedupe(self, items: list[str]) -> list[str]:
seen: list[str] = []
for item in items:
if item and item not in seen:
seen.append(item)
return seen
def is_test_path(self, path: str) -> bool:
lowered = path.lower()
return lowered.startswith("tests/") or "/tests/" in lowered or lowered.startswith("test_") or "/test_" in lowered

View File

@@ -1,166 +0,0 @@
from __future__ import annotations
import re
from app.modules.rag.retrieval.query_terms import extract_query_terms
class ProjectQaSupport:
def resolve_request(self, message: str) -> dict:
profile = self.build_profile(message)
subject = profile["entities"][0] if profile.get("entities") else ""
return {
"original_message": message,
"normalized_message": " ".join((message or "").split()),
"subject_hint": subject,
"source_hint": profile["domain"],
"russian": profile["russian"],
}
def build_profile(self, message: str) -> dict:
lowered = message.lower()
return {
"domain": "code" if self.looks_like_code_question(lowered) else "docs",
"intent": self.detect_intent(lowered),
"terms": extract_query_terms(message),
"entities": self.extract_entities(message),
"russian": self.is_russian(message),
}
def build_retrieval_query(self, resolved_request: dict, profile: dict) -> str:
normalized = str(resolved_request.get("normalized_message") or resolved_request.get("original_message") or "").strip()
if profile.get("domain") == "code" and "по коду" not in normalized.lower():
return f"по коду {normalized}".strip()
return normalized
def build_source_bundle(self, profile: dict, rag_items: list[dict], files_map: dict[str, dict]) -> dict:
terms = list(profile.get("terms", []) or [])
entities = list(profile.get("entities", []) or [])
explicit_test = any(term in {"test", "tests", "тест", "тесты"} for term in terms)
ranked_rag: list[tuple[int, dict]] = []
for item in rag_items:
score = self.rag_score(item, terms, entities)
source = str(item.get("source", "") or "")
if not explicit_test and self.is_test_path(source):
score -= 3
if score > 0:
ranked_rag.append((score, item))
ranked_rag.sort(key=lambda pair: pair[0], reverse=True)
ranked_files: list[tuple[int, dict]] = []
for path, payload in files_map.items():
score = self.file_score(path, payload, terms, entities)
if not explicit_test and self.is_test_path(path):
score -= 3
if score > 0:
ranked_files.append(
(
score,
{
"path": path,
"content": str(payload.get("content", "")),
"content_hash": str(payload.get("content_hash", "")),
},
)
)
ranked_files.sort(key=lambda pair: pair[0], reverse=True)
return {
"profile": profile,
"rag_items": [item for _, item in ranked_rag[:12]],
"file_candidates": [item for _, item in ranked_files[:10]],
"rag_total": len(ranked_rag),
"files_total": len(ranked_files),
}
def build_answer_brief(self, profile: dict, analysis: dict) -> dict:
return {
"question_profile": profile,
"resolved_subject": analysis.get("subject"),
"key_findings": analysis.get("findings", []),
"supporting_evidence": analysis.get("evidence", []),
"missing_evidence": analysis.get("gaps", []),
"answer_mode": analysis.get("answer_mode", "summary"),
}
def compose_answer(self, brief: dict) -> str:
profile = brief.get("question_profile", {}) or {}
russian = bool(profile.get("russian"))
answer_mode = str(brief.get("answer_mode") or "summary")
findings = list(brief.get("key_findings", []) or [])
evidence = list(brief.get("supporting_evidence", []) or [])
gaps = list(brief.get("missing_evidence", []) or [])
title = "## Кратко" if russian else "## Summary"
lines = [title]
lines.append("### Что реализовано" if answer_mode == "inventory" and russian else "### Implemented items" if answer_mode == "inventory" else "### Что видно по проекту" if russian else "### What the project shows")
if findings:
lines.extend(f"- {item}" for item in findings)
else:
lines.append("Не удалось собрать подтвержденные выводы по доступным данным." if russian else "No supported findings could be assembled from the available data.")
if evidence:
lines.append("")
lines.append("### Где смотреть в проекте" if russian else "### Where to look in the project")
lines.extend(f"- `{item}`" for item in evidence[:5])
if gaps:
lines.append("")
lines.append("### Что пока не подтверждено кодом" if russian else "### What is not yet confirmed in code")
lines.extend(f"- {item}" for item in gaps[:3])
return "\n".join(lines)
def detect_intent(self, lowered: str) -> str:
if any(token in lowered for token in ("какие", "что уже реализ", "список", "перечень", "какие есть")):
return "inventory"
if any(token in lowered for token in ("где", "find", "where")):
return "lookup"
if any(token in lowered for token in ("сравни", "compare")):
return "compare"
return "explain"
def looks_like_code_question(self, lowered: str) -> bool:
code_markers = ("по коду", "код", "реализ", "имплементац", "класс", "метод", "модул", "файл", "канал", "handler", "endpoint")
return any(marker in lowered for marker in code_markers) or bool(re.search(r"\b[A-Z][A-Za-z0-9_]{2,}\b", lowered))
def extract_entities(self, message: str) -> list[str]:
return re.findall(r"\b[A-Z][A-Za-z0-9_]{2,}\b", message)[:5]
def rag_score(self, item: dict, terms: list[str], entities: list[str]) -> int:
haystacks = [
str(item.get("source", "") or "").lower(),
str(item.get("title", "") or "").lower(),
str(item.get("content", "") or "").lower(),
str((item.get("metadata", {}) or {}).get("qname", "") or "").lower(),
]
score = 0
for term in terms:
if any(term in hay for hay in haystacks):
score += 3
for entity in entities:
if any(entity.lower() in hay for hay in haystacks):
score += 5
return score
def file_score(self, path: str, payload: dict, terms: list[str], entities: list[str]) -> int:
content = str(payload.get("content", "") or "").lower()
path_lower = path.lower()
score = 0
for term in terms:
if term in path_lower:
score += 4
elif term in content:
score += 2
for entity in entities:
entity_lower = entity.lower()
if entity_lower in path_lower:
score += 5
elif entity_lower in content:
score += 3
return score
def is_test_path(self, path: str) -> bool:
lowered = path.lower()
return lowered.startswith("tests/") or "/tests/" in lowered or lowered.startswith("test_") or "/test_" in lowered
def is_russian(self, text: str) -> bool:
return any("а" <= ch.lower() <= "я" or ch.lower() == "ё" for ch in text)

View File

@@ -1,102 +0,0 @@
from __future__ import annotations
from urllib.parse import urlparse
from app.modules.agent.engine.orchestrator.actions.common import ActionSupport
from app.modules.agent.engine.orchestrator.execution_context import ExecutionContext
from app.modules.agent.engine.orchestrator.models import ArtifactType
class ReviewActions(ActionSupport):
def fetch_source_doc(self, ctx: ExecutionContext) -> list[str]:
attachment = next((a for a in ctx.task.attachments if a.value), None)
if attachment is None:
text = ctx.task.user_message
source_ref = "inline:message"
else:
parsed = urlparse(attachment.value)
source_ref = attachment.value
text = f"Source: {parsed.netloc}\nPath: {parsed.path}\nRequest: {ctx.task.user_message}"
evidence_id = self.add_evidence(
ctx,
source_type="external_doc",
source_ref=source_ref,
snippet=text,
score=0.75,
)
return [
self.put(
ctx,
"source_doc_raw",
ArtifactType.TEXT,
text,
meta={"source_ref": source_ref, "evidence_ids": [evidence_id]},
)
]
def normalize_document(self, ctx: ExecutionContext) -> list[str]:
raw = str(self.get(ctx, "source_doc_raw", "") or "")
normalized = "\n".join(line.rstrip() for line in raw.splitlines()).strip()
return [self.put(ctx, "source_doc_text", ArtifactType.TEXT, normalized)]
def structural_check(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or "")
required = ["цель", "границ", "риски", "api", "данные"]
found = [token for token in required if token in text.lower()]
findings = {
"required_sections": required,
"found_markers": found,
"missing_markers": [token for token in required if token not in found],
}
return [self.put(ctx, "structural_findings", ArtifactType.STRUCTURED_JSON, findings)]
def semantic_consistency_check(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or "")
contradictions = []
if "без изменений" in text.lower() and "новый" in text.lower():
contradictions.append("Contains both 'no changes' and 'new behavior' markers.")
payload = {"contradictions": contradictions, "status": "ok" if not contradictions else "needs_attention"}
return [self.put(ctx, "semantic_findings", ArtifactType.STRUCTURED_JSON, payload)]
def architecture_fit_check(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or "")
files_count = len(dict(ctx.task.metadata.get("files_map", {}) or {}))
payload = {
"architecture_fit": "medium" if files_count == 0 else "high",
"notes": "Evaluate fit against existing docs and interfaces.",
"markers": ["integration"] if "integr" in text.lower() else [],
}
return [self.put(ctx, "architecture_findings", ArtifactType.STRUCTURED_JSON, payload)]
def optimization_check(self, ctx: ExecutionContext) -> list[str]:
text = str(self.get(ctx, "source_doc_text", "") or "")
has_perf = any(token in text.lower() for token in ("latency", "performance", "оптим"))
payload = {
"optimization_considered": has_perf,
"recommendation": "Add explicit non-functional targets." if not has_perf else "Optimization criteria present.",
}
return [self.put(ctx, "optimization_findings", ArtifactType.STRUCTURED_JSON, payload)]
def compose_review_report(self, ctx: ExecutionContext) -> list[str]:
structural = self.get(ctx, "structural_findings", {}) or {}
semantic = self.get(ctx, "semantic_findings", {}) or {}
architecture = self.get(ctx, "architecture_findings", {}) or {}
optimization = self.get(ctx, "optimization_findings", {}) or {}
report = "\n".join(
[
"## Findings",
f"- Missing structure markers: {', '.join(structural.get('missing_markers', [])) or 'none'}",
f"- Contradictions: {len(semantic.get('contradictions', []))}",
f"- Architecture fit: {architecture.get('architecture_fit', 'unknown')}",
f"- Optimization: {optimization.get('recommendation', 'n/a')}",
"",
"## Recommendations",
"- Clarify boundaries and data contracts.",
"- Add explicit error and rollback behavior.",
"- Add measurable non-functional requirements.",
]
)
return [
self.put(ctx, "review_report", ArtifactType.REVIEW_REPORT, report),
self.put(ctx, "final_answer", ArtifactType.TEXT, report),
]

View File

@@ -1,50 +0,0 @@
from __future__ import annotations
from uuid import uuid4
from app.modules.agent.engine.orchestrator.models import ArtifactItem, ArtifactType
class ArtifactStore:
def __init__(self) -> None:
self._by_id: dict[str, ArtifactItem] = {}
self._by_key: dict[str, ArtifactItem] = {}
def put(self, *, key: str, artifact_type: ArtifactType, content=None, meta: dict | None = None) -> ArtifactItem:
item_meta = dict(meta or {})
if content is not None and not isinstance(content, str):
item_meta.setdefault("value", content)
item = ArtifactItem(
artifact_id=f"artifact_{uuid4().hex}",
key=key,
type=artifact_type,
content=self._as_content(content),
meta=item_meta,
)
self._by_id[item.artifact_id] = item
self._by_key[key] = item
return item
def get(self, key: str) -> ArtifactItem | None:
return self._by_key.get(key)
def get_content(self, key: str, default=None):
item = self.get(key)
if item is None:
return default
if item.content is not None:
return item.content
return item.meta.get("value", default)
def has(self, key: str) -> bool:
return key in self._by_key
def all_items(self) -> list[ArtifactItem]:
return list(self._by_id.values())
def _as_content(self, value):
if value is None:
return None
if isinstance(value, str):
return value
return None

Some files were not shown because too many files have changed in this diff Show More