diff --git a/_process.zip b/_process.zip new file mode 100644 index 0000000..8577803 Binary files /dev/null and b/_process.zip differ diff --git a/src/app/main.py b/src/app/main.py index 5155da3..30819d9 100644 --- a/src/app/main.py +++ b/src/app/main.py @@ -27,7 +27,7 @@ def create_app() -> FastAPI: allow_headers=["*"], ) - app.include_router(modules.agent_api.public_router()) + app.include_router(modules.api.public_router()) app.include_router(modules.rag.public_router()) app.include_router(modules.rag.internal_router()) app.include_router(modules.rag_repo.internal_router()) diff --git a/src/app/modules/agent/llm/service.py b/src/app/modules/agent/llm/service.py index deabf69..82e83f5 100644 --- a/src/app/modules/agent/llm/service.py +++ b/src/app/modules/agent/llm/service.py @@ -1,5 +1,6 @@ import logging +from app.modules.agent.observability.module_trace import ModuleTrace from app.modules.agent.llm.prompt_loader import PromptLoader from app.modules.shared.gigachat.client import GigaChatClient @@ -18,10 +19,26 @@ class AgentLlmService: self._client = client self._prompts = prompts - def generate(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> str: - system_prompt = self._prompts.load(prompt_name) - if not system_prompt: - system_prompt = "You are a helpful assistant." + def build_request(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> dict[str, str]: + system_prompt = self._prompts.load(prompt_name) or "You are a helpful assistant." + return { + "prompt_name": prompt_name, + "system_prompt": system_prompt, + "user_prompt": user_input, + "log_context": log_context or "", + } + + def generate( + self, + prompt_name: str, + user_input: str, + *, + log_context: str | None = None, + trace: ModuleTrace | None = None, + ) -> str: + request = self.build_request(prompt_name, user_input, log_context=log_context) + if trace is not None: + trace.log("request", request) if log_context: LOGGER.warning( "graph llm input: context=%s prompt=%s user_input=%s", @@ -29,7 +46,12 @@ class AgentLlmService: prompt_name, _truncate_for_log(user_input), ) - output = self._client.complete(system_prompt=system_prompt, user_prompt=user_input) + output = self._client.complete( + system_prompt=request["system_prompt"], + user_prompt=request["user_prompt"], + ) + if trace is not None: + trace.log("response", {"text": output}) if log_context: LOGGER.warning( "graph llm output: context=%s prompt=%s output=%s", diff --git a/src/app/modules/agent/observability/module_trace.py b/src/app/modules/agent/observability/module_trace.py new file mode 100644 index 0000000..35eb2c5 --- /dev/null +++ b/src/app/modules/agent/observability/module_trace.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol + + +class ModuleTraceLogger(Protocol): + def log_module(self, request_id: str, module: str, title: str, payload: dict | None = None) -> None: ... + + +@dataclass(frozen=True, slots=True) +class ModuleTrace: + request_id: str + module: str + logger: ModuleTraceLogger + + def log(self, title: str, payload: dict | None = None) -> None: + self.logger.log_module(self.request_id, self.module, title, payload or {}) + + +@dataclass(frozen=True, slots=True) +class RequestTraceContext: + request_id: str + logger: ModuleTraceLogger + + def module(self, name: str) -> ModuleTrace: + return ModuleTrace(request_id=self.request_id, module=name, logger=self.logger) diff --git a/src/app/modules/agent/orchestration/__init__.py b/src/app/modules/agent/orchestration/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app/modules/agent/orchestration/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app/modules/agent/orchestration/adapters/intent_router_adapter.py b/src/app/modules/agent/orchestration/adapters/intent_router_adapter.py new file mode 100644 index 0000000..f75cf7a --- /dev/null +++ b/src/app/modules/agent/orchestration/adapters/intent_router_adapter.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from app.modules.agent.observability.module_trace import ModuleTrace +from app.modules.agent.intent_router_v2 import IntentRouterV2 + + +class IntentRouterAdapter: + def __init__(self, router: IntentRouterV2) -> None: + self._router = router + + def route(self, user_query: str, conversation_state, repo_context, trace: ModuleTrace | None = None): + if trace is not None: + trace.log("started", {"question": user_query}) + result = self._router.route(user_query, conversation_state, repo_context) + if trace is not None: + trace.log( + "completed", + { + "intent": result.intent, + "sub_intent": result.query_plan.sub_intent, + "matched_intent_source": result.matched_intent_source, + }, + ) + return result diff --git a/src/app/modules/agent/orchestration/adapters/llm_chat_adapter.py b/src/app/modules/agent/orchestration/adapters/llm_chat_adapter.py new file mode 100644 index 0000000..32de6ab --- /dev/null +++ b/src/app/modules/agent/orchestration/adapters/llm_chat_adapter.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import asyncio + +from app.modules.agent.observability.module_trace import ModuleTrace +from app.modules.agent.llm.service import AgentLlmService + + +class LlmChatAdapter: + def __init__(self, llm: AgentLlmService, prompt_name: str = "simple_llm_answer") -> None: + self._llm = llm + self._prompt_name = prompt_name + + @property + def prompt_name(self) -> str: + return self._prompt_name + + def build_request(self, message: str, request_id: str) -> dict[str, str]: + return self._llm.build_request( + self._prompt_name, + message, + log_context=f"agent:{request_id}", + ) + + async def generate(self, message: str, request_id: str, trace: ModuleTrace | None = None) -> str: + return await asyncio.to_thread( + self._llm.generate, + self._prompt_name, + message, + log_context=f"agent:{request_id}", + trace=trace, + ) diff --git a/src/app/modules/orchestration/adapters/task_runtime_adapter.py b/src/app/modules/agent/orchestration/adapters/task_runtime_adapter.py similarity index 90% rename from src/app/modules/orchestration/adapters/task_runtime_adapter.py rename to src/app/modules/agent/orchestration/adapters/task_runtime_adapter.py index 0437b5d..42ac762 100644 --- a/src/app/modules/orchestration/adapters/task_runtime_adapter.py +++ b/src/app/modules/agent/orchestration/adapters/task_runtime_adapter.py @@ -4,8 +4,8 @@ from types import SimpleNamespace from app.core.exceptions import AppError from app.modules.agent.task_runtime.facade import AgentTaskRuntimeFacade -from app.modules.agent_api.domain.models.agent_session import AgentSession -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.api.domain.models.agent_session import AgentSession +from app.modules.agent.orchestration.context.execution_context import ExecutionContext from app.schemas.common import ModuleName diff --git a/src/app/modules/agent/orchestration/context/execution_context.py b/src/app/modules/agent/orchestration/context/execution_context.py new file mode 100644 index 0000000..9db381f --- /dev/null +++ b/src/app/modules/agent/orchestration/context/execution_context.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from app.modules.api.domain.models.agent_request import AgentRequest +from app.modules.api.domain.models.agent_session import AgentSession +from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger +from app.modules.agent.observability.module_trace import RequestTraceContext +from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher + + +@dataclass(slots=True) +class ExecutionContext: + request: AgentRequest + session: AgentSession + publisher: ClientMessagePublisher + trace_logger: RequestTraceLogger + trace: RequestTraceContext + task_context: Any = None + route_result: Any = None + workflow_result: Any = None diff --git a/src/app/modules/orchestration/facade.py b/src/app/modules/agent/orchestration/facade.py similarity index 74% rename from src/app/modules/orchestration/facade.py rename to src/app/modules/agent/orchestration/facade.py index 85b807c..25bf6d8 100644 --- a/src/app/modules/orchestration/facade.py +++ b/src/app/modules/agent/orchestration/facade.py @@ -3,14 +3,15 @@ from __future__ import annotations from datetime import datetime, timezone from app.core.exceptions import AppError -from app.modules.agent_api.domain.models.agent_request import AgentRequest -from app.modules.agent_api.domain.models.agent_session import AgentSession -from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger -from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore -from app.modules.orchestration.context.execution_context import ExecutionContext -from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher -from app.modules.orchestration.processes.registry import ProcessRegistry -from app.modules.orchestration.runtime.process_runner import ProcessRunner +from app.modules.api.domain.models.agent_request import AgentRequest +from app.modules.api.domain.models.agent_session import AgentSession +from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger +from app.modules.agent.observability.module_trace import RequestTraceContext +from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore +from app.modules.agent.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher +from app.modules.agent.orchestration.processes.registry import ProcessRegistry +from app.modules.agent.orchestration.runtime.process_runner import ProcessRunner from app.schemas.common import ErrorPayload, ModuleName from app.schemas.orchestration import RequestExecutionStatus @@ -43,6 +44,7 @@ class OrchestrationFacade: session=session, publisher=self._publisher, trace_logger=self._trace_logger, + trace=RequestTraceContext(request_id=request.request_id, logger=self._trace_logger), ) await self._process_runner.run(context, process.steps()) request.status = RequestExecutionStatus.DONE @@ -56,7 +58,7 @@ class OrchestrationFacade: request.error = ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module) else: request.error = ErrorPayload( - code="agent_api_runtime_error", + code="api_runtime_error", desc="Agent request failed unexpectedly.", module=ModuleName.AGENT, ) diff --git a/src/app/modules/orchestration/messaging/client_message_publisher.py b/src/app/modules/agent/orchestration/messaging/client_message_publisher.py similarity index 70% rename from src/app/modules/orchestration/messaging/client_message_publisher.py rename to src/app/modules/agent/orchestration/messaging/client_message_publisher.py index 5c88a57..cdfb20f 100644 --- a/src/app/modules/orchestration/messaging/client_message_publisher.py +++ b/src/app/modules/agent/orchestration/messaging/client_message_publisher.py @@ -1,9 +1,9 @@ from __future__ import annotations -from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger -from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel -from app.modules.orchestration.messaging.status_message_factory import StatusMessageFactory -from app.modules.orchestration.messaging.user_message_factory import UserMessageFactory +from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger +from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel +from app.modules.agent.orchestration.messaging.status_message_factory import StatusMessageFactory +from app.modules.agent.orchestration.messaging.user_message_factory import UserMessageFactory class ClientMessagePublisher: diff --git a/src/app/modules/orchestration/messaging/status_message_factory.py b/src/app/modules/agent/orchestration/messaging/status_message_factory.py similarity index 84% rename from src/app/modules/orchestration/messaging/status_message_factory.py rename to src/app/modules/agent/orchestration/messaging/status_message_factory.py index 0d6aa9e..b11befd 100644 --- a/src/app/modules/orchestration/messaging/status_message_factory.py +++ b/src/app/modules/agent/orchestration/messaging/status_message_factory.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.agent_api.domain.events.client_event import ClientEventRecord +from app.modules.api.domain.events.client_event import ClientEventRecord from app.schemas.client_events import ClientEventType diff --git a/src/app/modules/orchestration/messaging/user_message_factory.py b/src/app/modules/agent/orchestration/messaging/user_message_factory.py similarity index 84% rename from src/app/modules/orchestration/messaging/user_message_factory.py rename to src/app/modules/agent/orchestration/messaging/user_message_factory.py index 2ded13e..aa24fb5 100644 --- a/src/app/modules/orchestration/messaging/user_message_factory.py +++ b/src/app/modules/agent/orchestration/messaging/user_message_factory.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.agent_api.domain.events.client_event import ClientEventRecord +from app.modules.api.domain.events.client_event import ClientEventRecord from app.schemas.client_events import ClientEventType diff --git a/src/app/modules/orchestration/processes/registry.py b/src/app/modules/agent/orchestration/processes/registry.py similarity index 69% rename from src/app/modules/orchestration/processes/registry.py rename to src/app/modules/agent/orchestration/processes/registry.py index df55209..6da53ba 100644 --- a/src/app/modules/orchestration/processes/registry.py +++ b/src/app/modules/agent/orchestration/processes/registry.py @@ -1,7 +1,7 @@ from __future__ import annotations -from app.modules.orchestration.processes.v1.process import V1Process -from app.modules.orchestration.processes.v2.process import V2Process +from app.modules.agent.orchestration.processes.v1.process import V1Process +from app.modules.agent.orchestration.processes.v2.process import V2Process class ProcessRegistry: diff --git a/src/app/modules/orchestration/processes/v1/process.py b/src/app/modules/agent/orchestration/processes/v1/process.py similarity index 100% rename from src/app/modules/orchestration/processes/v1/process.py rename to src/app/modules/agent/orchestration/processes/v1/process.py diff --git a/src/app/modules/agent/orchestration/processes/v1/prompt_payload_builder.py b/src/app/modules/agent/orchestration/processes/v1/prompt_payload_builder.py new file mode 100644 index 0000000..9faf40e --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v1/prompt_payload_builder.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +import json + +from app.modules.agent.orchestration.context.execution_context import ExecutionContext + + +class V1PromptPayloadBuilder: + def build(self, context: ExecutionContext) -> str: + payload = {"question": context.request.message} + return json.dumps(payload, ensure_ascii=False, indent=2) diff --git a/src/app/modules/agent/orchestration/processes/v1/prompts.yml b/src/app/modules/agent/orchestration/processes/v1/prompts.yml new file mode 100644 index 0000000..243f86c --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v1/prompts.yml @@ -0,0 +1,12 @@ +prompts: + simple_llm_answer: | + Ты полезный AI-ассистент проекта. + + На вход приходит JSON с полем: + - question + + Правила: + - Отвечай как персонаж мемов из дагестана + - Если вопрос неясный, аккуратно укажи, чего не хватает + - Не выдумывай несуществующие факты о проекте + - Формулируй ответ как обычное сообщение пользователю diff --git a/src/app/modules/agent/orchestration/processes/v1/simple_llm_workflow.py b/src/app/modules/agent/orchestration/processes/v1/simple_llm_workflow.py new file mode 100644 index 0000000..62ef2bf --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v1/simple_llm_workflow.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter +from app.modules.agent.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.processes.v1.prompt_payload_builder import V1PromptPayloadBuilder + + +class SimpleLlmWorkflow: + workflow_id = "simple_llm" + + def __init__( + self, + llm: LlmChatAdapter, + prompt_payload_builder: V1PromptPayloadBuilder | None = None, + ) -> None: + self._llm = llm + self._payload_builder = prompt_payload_builder or V1PromptPayloadBuilder() + + async def run(self, context: ExecutionContext) -> dict: + workflow_trace = context.trace.module("task_workflow") + workflow_trace.log("started", {"workflow_id": self.workflow_id}) + prompt_payload = self._payload_builder.build(context) + answer = await self._llm.generate( + prompt_payload, + context.request.request_id, + trace=context.trace.module("llm"), + ) + result = { + "workflow_id": self.workflow_id, + "prompt_name": self._llm.prompt_name, + "prompt_payload": prompt_payload, + "answer": answer, + } + workflow_trace.log( + "completed", + { + "workflow_id": self.workflow_id, + "prompt_name": self._llm.prompt_name, + "answer_length": len(answer), + }, + ) + return result diff --git a/src/app/modules/orchestration/processes/v1/steps/bootstrap_step.py b/src/app/modules/agent/orchestration/processes/v1/steps/bootstrap_step.py similarity index 63% rename from src/app/modules/orchestration/processes/v1/steps/bootstrap_step.py rename to src/app/modules/agent/orchestration/processes/v1/steps/bootstrap_step.py index 5813686..34e344d 100644 --- a/src/app/modules/orchestration/processes/v1/steps/bootstrap_step.py +++ b/src/app/modules/agent/orchestration/processes/v1/steps/bootstrap_step.py @@ -1,11 +1,14 @@ from __future__ import annotations -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class BootstrapStep: async def run(self, context: ExecutionContext) -> None: - context.trace_logger.log_step(context.request.request_id, "bootstrap", "started") + context.trace.module("orchestrator").log( + "bootstrap", + {"status": "started", "process_version": context.request.process_version}, + ) await context.publisher.publish_status( context.request.request_id, "orchestrator", @@ -17,4 +20,4 @@ class BootstrapStep: "Запускаю процесс обработки v1.", {"process_version": context.request.process_version}, ) - context.trace_logger.log_step(context.request.request_id, "bootstrap", "completed") + context.trace.module("orchestrator").log("bootstrap", {"status": "completed"}) diff --git a/src/app/modules/agent/orchestration/processes/v1/steps/execute_llm_workflow_step.py b/src/app/modules/agent/orchestration/processes/v1/steps/execute_llm_workflow_step.py new file mode 100644 index 0000000..61d7d1a --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v1/steps/execute_llm_workflow_step.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from app.modules.agent.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.processes.v1.simple_llm_workflow import SimpleLlmWorkflow + + +class ExecuteLlmWorkflowStep: + def __init__(self, workflow: SimpleLlmWorkflow) -> None: + self._workflow = workflow + + async def run(self, context: ExecutionContext) -> None: + request = context.request + await context.publisher.publish_status( + request.request_id, + "task_workflow", + f"Запускаю workflow {self._workflow.workflow_id}.", + ) + await context.publisher.publish_status( + request.request_id, + "prompt_builder", + "Формирую prompt payload для LLM.", + ) + result = await self._workflow.run(context) + request.answer = str(result.get("answer") or "") + context.workflow_result = result + await context.publisher.publish_status( + request.request_id, + "llm_process", + "Ответ от LLM получен.", + { + "workflow_id": result.get("workflow_id"), + "prompt_name": result.get("prompt_name"), + "answer_length": len(request.answer), + }, + ) diff --git a/src/app/modules/orchestration/processes/v1/steps/finalize_step.py b/src/app/modules/agent/orchestration/processes/v1/steps/finalize_step.py similarity index 65% rename from src/app/modules/orchestration/processes/v1/steps/finalize_step.py rename to src/app/modules/agent/orchestration/processes/v1/steps/finalize_step.py index 789d973..68afc79 100644 --- a/src/app/modules/orchestration/processes/v1/steps/finalize_step.py +++ b/src/app/modules/agent/orchestration/processes/v1/steps/finalize_step.py @@ -1,12 +1,12 @@ from __future__ import annotations -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class FinalizeStep: async def run(self, context: ExecutionContext) -> None: request = context.request - context.trace_logger.log_step(request.request_id, "finalize", "started") + context.trace.module("orchestrator").log("finalize", {"status": "started"}) await context.publisher.publish_user( request.request_id, "agent", @@ -17,4 +17,4 @@ class FinalizeStep: "orchestrator", "Обработка запроса завершена.", ) - context.trace_logger.log_step(request.request_id, "finalize", "completed") + context.trace.module("orchestrator").log("finalize", {"status": "completed"}) diff --git a/src/app/modules/orchestration/processes/v1/steps/run_llm_step.py b/src/app/modules/agent/orchestration/processes/v1/steps/run_llm_step.py similarity index 83% rename from src/app/modules/orchestration/processes/v1/steps/run_llm_step.py rename to src/app/modules/agent/orchestration/processes/v1/steps/run_llm_step.py index ef362cb..42c3c74 100644 --- a/src/app/modules/orchestration/processes/v1/steps/run_llm_step.py +++ b/src/app/modules/agent/orchestration/processes/v1/steps/run_llm_step.py @@ -1,7 +1,7 @@ from __future__ import annotations -from app.modules.orchestration.adapters.llm_chat_adapter import LlmChatAdapter -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class RunLlmStep: diff --git a/src/app/modules/orchestration/processes/v2/README.md b/src/app/modules/agent/orchestration/processes/v2/README.md similarity index 98% rename from src/app/modules/orchestration/processes/v2/README.md rename to src/app/modules/agent/orchestration/processes/v2/README.md index 970a895..0a7d516 100644 --- a/src/app/modules/orchestration/processes/v2/README.md +++ b/src/app/modules/agent/orchestration/processes/v2/README.md @@ -163,7 +163,7 @@ Typical sequence: ## Trace Logging Per-request trace files are written by: -[request_trace_logger.py](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/src/app/modules/agent_api/infrastructure/logging/request_trace_logger.py) +[request_trace_logger.py](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/src/app/modules/api/infrastructure/logging/request_trace_logger.py) Location: [runtime_traces/agent_requests](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/runtime_traces/agent_requests) diff --git a/src/app/modules/orchestration/processes/v2/process.py b/src/app/modules/agent/orchestration/processes/v2/process.py similarity index 100% rename from src/app/modules/orchestration/processes/v2/process.py rename to src/app/modules/agent/orchestration/processes/v2/process.py diff --git a/src/app/modules/agent/orchestration/processes/v2/prompt_payload_builder.py b/src/app/modules/agent/orchestration/processes/v2/prompt_payload_builder.py new file mode 100644 index 0000000..223e3b8 --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v2/prompt_payload_builder.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import json + +from app.modules.agent.runtime.docs_qa_pipeline.models import DocsEvidenceBundle, OpenAPIResult + + +class V2PromptPayloadBuilder: + def build( + self, + *, + question: str, + intent: str, + sub_intent: str, + evidence_bundle: DocsEvidenceBundle, + api_contract: OpenAPIResult | None = None, + ) -> str: + payload = { + "question": question, + "documents": list(evidence_bundle.documents), + "facts": list(evidence_bundle.facts), + "entities": list(evidence_bundle.entities), + "workflows": list(evidence_bundle.workflows), + "relations": list(evidence_bundle.relations), + "chunks": list(evidence_bundle.chunks), + } + if api_contract is not None: + payload["api_contract"] = { + "path": api_contract.path, + "method": api_contract.method, + "request_schema": api_contract.request_schema, + "response_schema": api_contract.response_schema, + "diagnostics": dict(api_contract.diagnostics), + } + return json.dumps(payload, ensure_ascii=False, indent=2) diff --git a/src/app/modules/agent/orchestration/processes/v2/prompt_selector.py b/src/app/modules/agent/orchestration/processes/v2/prompt_selector.py new file mode 100644 index 0000000..16714f6 --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v2/prompt_selector.py @@ -0,0 +1,18 @@ +"""Выбор prompt для process-local workflow v2.""" + +from __future__ import annotations + + +class V2PromptSelector: + _DOCS_INTENT_PROMPTS = { + "DOCUMENTATION_EXPLAIN": "documentation_explain_answer", + "GENERAL_QA": "general_qa_answer", + } + + def select(self, *, intent: str = "DOCUMENTATION_EXPLAIN", sub_intent: str, answer_mode: str) -> str: + intent_key = (intent or "DOCUMENTATION_EXPLAIN").upper() + if intent_key in {"OPENAPI_GENERATION", "OPENAPI_FROM_DOCUMENTATION"}: + if sub_intent.upper() == "OPENAPI_FRAGMENT_GENERATE": + return "openapi_fragment_answer" + return "openapi_answer" + return self._DOCS_INTENT_PROMPTS.get(intent_key, "documentation_explain_answer") diff --git a/src/app/modules/agent/orchestration/processes/v2/prompts.yml b/src/app/modules/agent/orchestration/processes/v2/prompts.yml new file mode 100644 index 0000000..5142637 --- /dev/null +++ b/src/app/modules/agent/orchestration/processes/v2/prompts.yml @@ -0,0 +1,96 @@ +prompts: + documentation_explain_answer: | + Ты объясняешь документацию системы. + + На вход приходит JSON с полями: + - question + - documents + - facts + - entities + - workflows + - relations + - chunks + + Правила: + - Используй только предоставленные факты + - Не додумывай + - Если данных недостаточно, скажи это явно + - Объясняй структурировано + + Формат ответа: + 1. Краткое описание + 2. Основные элементы + 3. Как это работает + 4. Связи с другими частями системы (если есть) + general_qa_answer: | + Ты отвечаешь на общий вопрос по документации проекта. + + На вход приходит JSON с полями: + - question + - documents + - facts + - entities + - workflows + - relations + - chunks + + Правила: + - Используй только предоставленные документы и факты + - Не додумывай отсутствующие детали + - Если данных недостаточно, скажи это прямо + - Дай короткий понятный ответ без лишней структуры + openapi_answer: | + Ты генерируешь OpenAPI спецификацию по документации API. + + На вход приходит JSON с полями: + - question + - documents + - facts + - entities + - workflows + - relations + - chunks + - api_contract + + Правила: + - Используй только данные из документации + - Не придумывай поля + - Если данных нет, не заполняй + - Верни ТОЛЬКО YAML без пояснений + + Формат: + paths: + /path: + method: + summary: ... + requestBody: + responses: + openapi_fragment_answer: | + Ты генерируешь часть OpenAPI schema по документации API. + + На вход приходит JSON с полями: + - question + - documents + - facts + - entities + - workflows + - relations + - chunks + - api_contract + + Правила: + - Используй только данные из документации + - Не придумывай отсутствующие поля + - Верни только содержимое нужного фрагмента + fallback_answer: | + Ты формируешь безопасный fallback-ответ. + + На вход приходит JSON с полями: + - question + - attachments + - confluence_urls + + Правила: + - Если специализированный workflow не выбран, честно скажи об ограничении + - Используй только данные из payload + - Не выдумывай детали проекта diff --git a/src/app/modules/orchestration/processes/v2/steps/execute_documentation_workflow_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/execute_documentation_workflow_step.py similarity index 58% rename from src/app/modules/orchestration/processes/v2/steps/execute_documentation_workflow_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/execute_documentation_workflow_step.py index 4ccab36..1268038 100644 --- a/src/app/modules/orchestration/processes/v2/steps/execute_documentation_workflow_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/execute_documentation_workflow_step.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase +from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase class ExecuteDocumentationWorkflowStep(WorkflowStepBase): diff --git a/src/app/modules/orchestration/processes/v2/steps/execute_fallback_workflow_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/execute_fallback_workflow_step.py similarity index 70% rename from src/app/modules/orchestration/processes/v2/steps/execute_fallback_workflow_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/execute_fallback_workflow_step.py index 14baeaa..6bf3c1f 100644 --- a/src/app/modules/orchestration/processes/v2/steps/execute_fallback_workflow_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/execute_fallback_workflow_step.py @@ -1,7 +1,7 @@ from __future__ import annotations -from app.modules.orchestration.context.execution_context import ExecutionContext -from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase +from app.modules.agent.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase class ExecuteFallbackWorkflowStep(WorkflowStepBase): diff --git a/src/app/modules/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py similarity index 55% rename from src/app/modules/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py index 4f42118..f7893fd 100644 --- a/src/app/modules/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/execute_general_qa_workflow_step.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase +from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase class ExecuteGeneralQaWorkflowStep(WorkflowStepBase): diff --git a/src/app/modules/orchestration/processes/v2/steps/execute_openapi_workflow_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/execute_openapi_workflow_step.py similarity index 56% rename from src/app/modules/orchestration/processes/v2/steps/execute_openapi_workflow_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/execute_openapi_workflow_step.py index 0b35a01..755f6c1 100644 --- a/src/app/modules/orchestration/processes/v2/steps/execute_openapi_workflow_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/execute_openapi_workflow_step.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase +from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase class ExecuteOpenApiWorkflowStep(WorkflowStepBase): diff --git a/src/app/modules/orchestration/processes/v2/steps/route_intent_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/route_intent_step.py similarity index 78% rename from src/app/modules/orchestration/processes/v2/steps/route_intent_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/route_intent_step.py index cfc282b..e02990b 100644 --- a/src/app/modules/orchestration/processes/v2/steps/route_intent_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/route_intent_step.py @@ -5,9 +5,9 @@ import asyncio from app.core.exceptions import AppError from app.modules.agent.task_runtime.context import TaskRuntimeContextBuilder from app.modules.agent.task_runtime.enrichment import ContextEnrichmentService -from app.modules.orchestration.adapters.intent_router_adapter import IntentRouterAdapter -from app.modules.orchestration.context.execution_context import ExecutionContext -from app.modules.orchestration.v2_progress import build_progress_callback +from app.modules.agent.orchestration.adapters.intent_router_adapter import IntentRouterAdapter +from app.modules.agent.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.v2_progress import build_progress_callback from app.schemas.common import ModuleName @@ -41,10 +41,10 @@ class RouteIntentStep: attachments=[], files=[], progress_cb=build_progress_callback(loop, context.publisher, request.request_id), + trace=context.trace, ) task_context.enriched_context = self._enrichment.enrich(task_context) context.task_context = task_context - context.trace_logger.log_step(request.request_id, "intent_router", "started") await context.publisher.publish_status( request.request_id, "intent_router", @@ -55,6 +55,7 @@ class RouteIntentStep: request.message, task_context.conversation_state, task_context.repo_context, + context.trace.module("intent_router"), ) task_context.route_result = route_result context.route_result = route_result @@ -68,13 +69,3 @@ class RouteIntentStep: "matched_intent_source": route_result.matched_intent_source, }, ) - context.trace_logger.log_step( - request.request_id, - "intent_router", - "completed", - { - "intent": route_result.intent, - "sub_intent": route_result.query_plan.sub_intent, - "matched_intent_source": route_result.matched_intent_source, - }, - ) diff --git a/src/app/modules/orchestration/processes/v2/steps/run_task_workflow_step.py b/src/app/modules/agent/orchestration/processes/v2/steps/run_task_workflow_step.py similarity index 84% rename from src/app/modules/orchestration/processes/v2/steps/run_task_workflow_step.py rename to src/app/modules/agent/orchestration/processes/v2/steps/run_task_workflow_step.py index 2b47b0e..f8d0a1b 100644 --- a/src/app/modules/orchestration/processes/v2/steps/run_task_workflow_step.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/run_task_workflow_step.py @@ -1,7 +1,7 @@ from __future__ import annotations -from app.modules.orchestration.adapters.task_runtime_adapter import TaskRuntimeAdapter -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.adapters.task_runtime_adapter import TaskRuntimeAdapter +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class RunTaskWorkflowStep: diff --git a/src/app/modules/orchestration/processes/v2/steps/workflow_step_base.py b/src/app/modules/agent/orchestration/processes/v2/steps/workflow_step_base.py similarity index 88% rename from src/app/modules/orchestration/processes/v2/steps/workflow_step_base.py rename to src/app/modules/agent/orchestration/processes/v2/steps/workflow_step_base.py index ab24088..bbd26ce 100644 --- a/src/app/modules/orchestration/processes/v2/steps/workflow_step_base.py +++ b/src/app/modules/agent/orchestration/processes/v2/steps/workflow_step_base.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio from typing import Any -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class WorkflowStepBase: @@ -20,7 +20,6 @@ class WorkflowStepBase: async def run(self, context: ExecutionContext) -> None: request = context.request task_context = context.task_context - context.trace_logger.log_step(request.request_id, self._step_name, "started") await context.publisher.publish_status( request.request_id, "task_workflow", @@ -32,12 +31,6 @@ class WorkflowStepBase: request.answer = result.answer or "" diagnostics = dict(result.meta.get("diagnostics") or {}) await self._publish_diagnostics(context, diagnostics, result) - context.trace_logger.log_step( - request.request_id, - self._step_name, - "completed", - {"workflow_id": self._workflow.workflow_id, "meta": result.meta}, - ) async def _publish_diagnostics(self, context: ExecutionContext, diagnostics: dict[str, Any], result: Any) -> None: request_id = context.request.request_id diff --git a/src/app/modules/orchestration/runtime/process_runner.py b/src/app/modules/agent/orchestration/runtime/process_runner.py similarity index 79% rename from src/app/modules/orchestration/runtime/process_runner.py rename to src/app/modules/agent/orchestration/runtime/process_runner.py index 92cebe4..2bed04a 100644 --- a/src/app/modules/orchestration/runtime/process_runner.py +++ b/src/app/modules/agent/orchestration/runtime/process_runner.py @@ -1,6 +1,6 @@ from __future__ import annotations -from app.modules.orchestration.context.execution_context import ExecutionContext +from app.modules.agent.orchestration.context.execution_context import ExecutionContext class ProcessRunner: diff --git a/src/app/modules/orchestration/v2_progress.py b/src/app/modules/agent/orchestration/v2_progress.py similarity index 83% rename from src/app/modules/orchestration/v2_progress.py rename to src/app/modules/agent/orchestration/v2_progress.py index 2b8393e..7d1ade4 100644 --- a/src/app/modules/orchestration/v2_progress.py +++ b/src/app/modules/agent/orchestration/v2_progress.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio -from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher +from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher def build_progress_callback(loop: asyncio.AbstractEventLoop, publisher: ClientMessagePublisher, request_id: str): diff --git a/src/app/modules/agent/runtime/docs_qa_pipeline/pipeline.py b/src/app/modules/agent/runtime/docs_qa_pipeline/pipeline.py index af42996..7755a3f 100644 --- a/src/app/modules/agent/runtime/docs_qa_pipeline/pipeline.py +++ b/src/app/modules/agent/runtime/docs_qa_pipeline/pipeline.py @@ -1,11 +1,9 @@ from __future__ import annotations -import math from time import perf_counter from typing import Any from app.modules.agent.llm import AgentLlmService -from app.modules.agent.llm.prompt_loader import PromptLoader from app.modules.agent.intent_router_v2.analysis.docs_query_signals import DocsQuerySignals from app.modules.agent.runtime.docs_qa_pipeline.answer_synthesizer import DocsAnswerSynthesizer from app.modules.agent.runtime.docs_qa_pipeline.anchor_selector import DocsAnchorSelector @@ -19,6 +17,7 @@ from app.modules.agent.runtime.docs_qa_pipeline.prompt_payload_builder import Do from app.modules.agent.runtime.legacy_pipeline import RetrievalAdapter from app.modules.agent.runtime.steps.context import build_retrieval_request from app.modules.agent.runtime.steps.generation import RuntimePromptSelector +from app.modules.agent.observability.module_trace import RequestTraceContext class DocsQAPipelineRunner: @@ -60,6 +59,7 @@ class DocsQAPipelineRunner: *, conversation_state: Any = None, mode: str = "full", + trace: RequestTraceContext | None = None, ) -> DocsQAPipelineResult: timings: dict[str, int] = {} t0 = perf_counter() @@ -88,6 +88,16 @@ class DocsQAPipelineRunner: ) if request.sub_intent == "RELATED_DOCS_EXPLAIN" and not raw_rows and self._has_relation_hits(unfiltered_rows): raw_rows = unfiltered_rows + if trace is not None: + trace.module("rag_retrieval").log( + "completed", + { + "planned_layers": list(request.requested_layers), + "executed_layers": list(retrieval_report.get("executed_layers") or request.requested_layers), + "layer_diagnostics": dict(retrieval_report.get("layer_diagnostics") or {}), + "rows": len(raw_rows), + }, + ) timings["retrieval"] = _ms(t1) t2 = perf_counter() @@ -140,7 +150,14 @@ class DocsQAPipelineRunner: ) answer = openapi_result.raw_yaml if answer_mode != "degraded" else "Недостаточно contract evidence для OpenAPI." else: - answer = self._generate_openapi_answer(user_query, router_result.intent, request.sub_intent, evidence_bundle, openapi_result) + answer = self._generate_openapi_answer( + user_query, + router_result.intent, + request.sub_intent, + evidence_bundle, + openapi_result, + trace=trace, + ) output_valid, llm_details = self._openapi_postprocessor.validate( answer, require_paths=request.sub_intent != "OPENAPI_FRAGMENT_GENERATE", @@ -176,7 +193,13 @@ class DocsQAPipelineRunner: ) output_valid = answer_mode != "degraded" else: - answer = self._generate_docs_answer(user_query, router_result.intent, request.sub_intent, evidence_bundle) + answer = self._generate_docs_answer( + user_query, + router_result.intent, + request.sub_intent, + evidence_bundle, + trace=trace, + ) answer_mode, degraded_reason, answer = self._finalize_docs_answer( answer=answer, raw_rows=raw_rows, @@ -197,6 +220,16 @@ class DocsQAPipelineRunner: openapi_result=openapi_result, router_result=router_result, ) + if trace is not None: + trace.module("evidence_gate").log( + "evaluated", + { + "decision": gate_decision, + "reason": gate_decision_reason, + "missing": gate_missing_requirements, + "satisfied": gate_satisfied_requirements, + }, + ) diagnostics = self._diagnostics_builder.build( intent=router_result.intent, sub_intent=request.sub_intent, @@ -255,7 +288,7 @@ class DocsQAPipelineRunner: mode=mode, ) - def _generate_docs_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle) -> str: + def _generate_docs_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, trace=None) -> str: if self._llm is None: return self._answer_synthesizer.synthesize(question, evidence_bundle) payload = self._prompt_payload_builder.build( @@ -265,9 +298,14 @@ class DocsQAPipelineRunner: evidence_bundle=evidence_bundle, ) prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal") - return self._llm.generate(prompt_name, payload, log_context="graph.project_qa.docs.answer").strip() + return self._llm.generate( + prompt_name, + payload, + log_context="graph.project_qa.docs.answer", + trace=trace.module("llm") if trace is not None else None, + ).strip() - def _generate_openapi_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, api_contract) -> str: + def _generate_openapi_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, api_contract, trace=None) -> str: if self._llm is None: return api_contract.raw_yaml payload = self._prompt_payload_builder.build( @@ -278,7 +316,12 @@ class DocsQAPipelineRunner: api_contract=api_contract, ) prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal") - return self._llm.generate(prompt_name, payload, log_context="graph.project_qa.docs.openapi").strip() + return self._llm.generate( + prompt_name, + payload, + log_context="graph.project_qa.docs.openapi", + trace=trace.module("llm") if trace is not None else None, + ).strip() def _llm_mode(self, intent: str, sub_intent: str) -> str: if sub_intent == "RELATED_DOCS_EXPLAIN": @@ -307,19 +350,14 @@ class DocsQAPipelineRunner: evidence_bundle=evidence_bundle, api_contract=api_contract, ) - system_prompt = PromptLoader().load(prompt_name) or "You are a helpful assistant." - tokens_in_estimate = max(1, int(math.ceil((len(system_prompt) + len(user_prompt)) / 4))) - return { - "prompt_name": prompt_name, - "system_prompt": system_prompt, - "user_prompt": user_prompt, - "log_context": log_context, - "prompt_stats": { - "system_chars": len(system_prompt), - "user_chars": len(user_prompt), - "tokens_in_estimate": tokens_in_estimate, - }, - } + if self._llm is None: + return { + "prompt_name": prompt_name, + "system_prompt": "You are a helpful assistant.", + "user_prompt": user_prompt, + "log_context": log_context, + } + return self._llm.build_request(prompt_name, user_prompt, log_context=log_context) def _finalize_docs_answer( self, diff --git a/src/app/modules/agent/runtime/docs_qa_pipeline/prompt_payload_builder.py b/src/app/modules/agent/runtime/docs_qa_pipeline/prompt_payload_builder.py index fc35ad7..2b85fda 100644 --- a/src/app/modules/agent/runtime/docs_qa_pipeline/prompt_payload_builder.py +++ b/src/app/modules/agent/runtime/docs_qa_pipeline/prompt_payload_builder.py @@ -17,8 +17,6 @@ class DocsPromptPayloadBuilder: ) -> str: payload = { "question": question, - "intent": intent, - "sub_intent": sub_intent, "documents": list(evidence_bundle.documents), "facts": list(evidence_bundle.facts), "entities": list(evidence_bundle.entities), diff --git a/src/app/modules/agent/task_runtime/context.py b/src/app/modules/agent/task_runtime/context.py index 6404fd5..082fe2e 100644 --- a/src/app/modules/agent/task_runtime/context.py +++ b/src/app/modules/agent/task_runtime/context.py @@ -21,6 +21,7 @@ class TaskRuntimeContextBuilder: attachments: list[dict], files: list[dict], progress_cb, + trace=None, ) -> TaskRuntimeContext: files_map = self._files_to_map(files) return TaskRuntimeContext( @@ -33,6 +34,7 @@ class TaskRuntimeContextBuilder: files=list(files or []), files_map=files_map, progress_cb=progress_cb, + trace=trace, repo_context=self._repo_context_factory.build(files_map), conversation_state=ConversationState(), ) diff --git a/src/app/modules/agent/task_runtime/models.py b/src/app/modules/agent/task_runtime/models.py index fee0ab6..e2ad651 100644 --- a/src/app/modules/agent/task_runtime/models.py +++ b/src/app/modules/agent/task_runtime/models.py @@ -20,6 +20,7 @@ class TaskRuntimeContext: files: list[dict[str, Any]] = field(default_factory=list) files_map: dict[str, dict[str, Any]] = field(default_factory=dict) progress_cb: ProgressCallback | None = None + trace: Any = None repo_context: Any = None conversation_state: Any = None route_result: Any = None diff --git a/src/app/modules/agent/task_runtime/workflows/docs_qa.py b/src/app/modules/agent/task_runtime/workflows/docs_qa.py index 14da8e8..2f55323 100644 --- a/src/app/modules/agent/task_runtime/workflows/docs_qa.py +++ b/src/app/modules/agent/task_runtime/workflows/docs_qa.py @@ -13,11 +13,17 @@ class DocsQaWorkflow: self._runner = runner def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult: + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "started", + {"workflow_id": self.workflow_id, "message": ctx.message}, + ) result = self._runner.run( ctx.message, ctx.rag_session_id, conversation_state=ctx.conversation_state, mode="full", + trace=ctx.trace, ) diagnostics = result.diagnostics.model_dump(mode="json") emit_status_block( @@ -42,15 +48,22 @@ class DocsQaWorkflow: title="Evidence Gate", lines=_gate_lines(diagnostics), ) - return WorkflowExecutionResult( + result_payload = WorkflowExecutionResult( result_type=TaskResultType.ANSWER, answer=result.answer, meta={ "workflow_id": self.workflow_id, "intent": result.router_result.intent, "diagnostics": diagnostics, + "llm_request": result.llm_request, }, ) + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "completed", + {"workflow_id": self.workflow_id, "answer_length": len(result.answer or "")}, + ) + return result_payload def _retrieval_lines(diagnostics: dict) -> list[str]: diff --git a/src/app/modules/agent/task_runtime/workflows/fallback.py b/src/app/modules/agent/task_runtime/workflows/fallback.py index a96d01c..cb5c3b7 100644 --- a/src/app/modules/agent/task_runtime/workflows/fallback.py +++ b/src/app/modules/agent/task_runtime/workflows/fallback.py @@ -15,6 +15,11 @@ class FallbackWorkflow: self._llm = llm def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult: + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "started", + {"workflow_id": self.workflow_id, "message": ctx.message}, + ) emit_status_block( ctx, block_id="rag_retrieval", @@ -27,14 +32,18 @@ class FallbackWorkflow: payload = json.dumps( { "question": ctx.message, - "intent": getattr(ctx.route_result, "intent", ""), "attachments": list(ctx.attachments), "confluence_urls": list(ctx.enriched_context.get("confluence_urls") or []), }, ensure_ascii=False, indent=2, ) - answer = self._llm.generate("docs_fallback_answer", payload, log_context="agent.workflow.fallback").strip() + answer = self._llm.generate( + "fallback_answer", + payload, + log_context="agent.workflow.fallback", + trace=ctx.trace.module("llm") if ctx.trace is not None else None, + ).strip() emit_status_block( ctx, block_id="workflow", @@ -47,8 +56,14 @@ class FallbackWorkflow: title="Evidence Gate", lines=["not applied in fallback workflow"], ) - return WorkflowExecutionResult( + result = WorkflowExecutionResult( result_type=TaskResultType.ANSWER, answer=answer, meta={"workflow_id": self.workflow_id, "intent": getattr(ctx.route_result, "intent", "")}, ) + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "completed", + {"workflow_id": self.workflow_id, "answer_length": len(answer)}, + ) + return result diff --git a/src/app/modules/agent/task_runtime/workflows/general_qa.py b/src/app/modules/agent/task_runtime/workflows/general_qa.py index aa9ba41..9991eb3 100644 --- a/src/app/modules/agent/task_runtime/workflows/general_qa.py +++ b/src/app/modules/agent/task_runtime/workflows/general_qa.py @@ -14,11 +14,17 @@ class GeneralQaWorkflow: self._runner = runner def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult: + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "started", + {"workflow_id": self.workflow_id, "message": ctx.message}, + ) result = self._runner.run( ctx.message, ctx.rag_session_id, conversation_state=ctx.conversation_state, mode="full", + trace=ctx.trace, ) diagnostics = result.diagnostics.model_dump(mode="json") emit_status_block( @@ -43,12 +49,19 @@ class GeneralQaWorkflow: title="Evidence Gate", lines=_gate_lines(diagnostics), ) - return WorkflowExecutionResult( + result_payload = WorkflowExecutionResult( result_type=TaskResultType.ANSWER, answer=result.answer, meta={ "workflow_id": self.workflow_id, "intent": result.router_result.intent, "diagnostics": diagnostics, + "llm_request": result.llm_request, }, ) + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "completed", + {"workflow_id": self.workflow_id, "answer_length": len(result.answer or "")}, + ) + return result_payload diff --git a/src/app/modules/agent/task_runtime/workflows/openapi.py b/src/app/modules/agent/task_runtime/workflows/openapi.py index f9e01f3..fb6d95b 100644 --- a/src/app/modules/agent/task_runtime/workflows/openapi.py +++ b/src/app/modules/agent/task_runtime/workflows/openapi.py @@ -14,11 +14,17 @@ class OpenApiWorkflow: self._runner = runner def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult: + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "started", + {"workflow_id": self.workflow_id, "message": ctx.message}, + ) result = self._runner.run( ctx.message, ctx.rag_session_id, conversation_state=ctx.conversation_state, mode="full", + trace=ctx.trace, ) diagnostics = result.diagnostics.model_dump(mode="json") emit_status_block( @@ -51,7 +57,7 @@ class OpenApiWorkflow: format="yaml", source_refs=list(result.diagnostics.doc_paths), ) - return WorkflowExecutionResult( + result_payload = WorkflowExecutionResult( result_type=TaskResultType.OPENAPI, answer=content, artifacts=[artifact], @@ -59,5 +65,12 @@ class OpenApiWorkflow: "workflow_id": self.workflow_id, "intent": result.router_result.intent, "diagnostics": diagnostics, + "llm_request": result.llm_request, }, ) + if ctx.trace is not None: + ctx.trace.module("task_workflow").log( + "completed", + {"workflow_id": self.workflow_id, "answer_length": len(content or "")}, + ) + return result_payload diff --git a/src/app/modules/agent_api/__init__.py b/src/app/modules/agent_api/__init__.py deleted file mode 100644 index c689232..0000000 --- a/src/app/modules/agent_api/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from app.modules.agent_api.module import AgentApiModule - -__all__ = ["AgentApiModule"] diff --git a/src/app/modules/api/__init__.py b/src/app/modules/api/__init__.py new file mode 100644 index 0000000..c9c2ef6 --- /dev/null +++ b/src/app/modules/api/__init__.py @@ -0,0 +1 @@ +__all__: list[str] = [] diff --git a/src/app/modules/agent_api/application/request_service.py b/src/app/modules/api/application/request_service.py similarity index 72% rename from src/app/modules/agent_api/application/request_service.py rename to src/app/modules/api/application/request_service.py index 98cba04..ca4da41 100644 --- a/src/app/modules/agent_api/application/request_service.py +++ b/src/app/modules/api/application/request_service.py @@ -2,11 +2,11 @@ from __future__ import annotations import asyncio -from app.modules.agent_api.domain.models.agent_request import AgentRequest -from app.modules.agent_api.infrastructure.ids.request_id_factory import RequestIdFactory -from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore -from app.modules.agent_api.application.session_service import SessionService -from app.modules.orchestration.facade import OrchestrationFacade +from app.modules.api.domain.models.agent_request import AgentRequest +from app.modules.api.infrastructure.ids.request_id_factory import RequestIdFactory +from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore +from app.modules.api.application.session_service import SessionService +from app.modules.agent.orchestration.facade import OrchestrationFacade class RequestService: diff --git a/src/app/modules/agent_api/application/session_service.py b/src/app/modules/api/application/session_service.py similarity index 85% rename from src/app/modules/agent_api/application/session_service.py rename to src/app/modules/api/application/session_service.py index c5ec3a3..1dfeeab 100644 --- a/src/app/modules/agent_api/application/session_service.py +++ b/src/app/modules/api/application/session_service.py @@ -3,9 +3,9 @@ from __future__ import annotations from datetime import datetime, timezone from app.core.exceptions import AppError -from app.modules.agent_api.domain.models.agent_session import AgentSession -from app.modules.agent_api.infrastructure.ids.session_id_factory import SessionIdFactory -from app.modules.agent_api.infrastructure.stores.in_memory_session_store import InMemorySessionStore +from app.modules.api.domain.models.agent_session import AgentSession +from app.modules.api.infrastructure.ids.session_id_factory import SessionIdFactory +from app.modules.api.infrastructure.stores.in_memory_session_store import InMemorySessionStore from app.schemas.common import ModuleName diff --git a/src/app/modules/agent_api/application/stream_service.py b/src/app/modules/api/application/stream_service.py similarity index 83% rename from src/app/modules/agent_api/application/stream_service.py rename to src/app/modules/api/application/stream_service.py index 1d8eba3..1986f5c 100644 --- a/src/app/modules/agent_api/application/stream_service.py +++ b/src/app/modules/api/application/stream_service.py @@ -1,8 +1,8 @@ from __future__ import annotations from app.core.exceptions import AppError -from app.modules.agent_api.infrastructure.streaming.sse_encoder import SseEncoder -from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel +from app.modules.api.infrastructure.streaming.sse_encoder import SseEncoder +from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel from app.schemas.common import ModuleName diff --git a/src/app/modules/agent_api/controllers/request_controller.py b/src/app/modules/api/controllers/request_controller.py similarity index 94% rename from src/app/modules/agent_api/controllers/request_controller.py rename to src/app/modules/api/controllers/request_controller.py index 85abb95..c2f1841 100644 --- a/src/app/modules/agent_api/controllers/request_controller.py +++ b/src/app/modules/api/controllers/request_controller.py @@ -1,7 +1,7 @@ from __future__ import annotations from app.core.exceptions import AppError -from app.modules.agent_api.application.request_service import RequestService +from app.modules.api.application.request_service import RequestService from app.schemas.agent_api import AgentRequestCreateRequest, AgentRequestQueuedResponse, AgentRequestStateResponse from app.schemas.common import ModuleName diff --git a/src/app/modules/agent_api/controllers/session_controller.py b/src/app/modules/api/controllers/session_controller.py similarity index 94% rename from src/app/modules/agent_api/controllers/session_controller.py rename to src/app/modules/api/controllers/session_controller.py index c01938d..996de49 100644 --- a/src/app/modules/agent_api/controllers/session_controller.py +++ b/src/app/modules/api/controllers/session_controller.py @@ -6,7 +6,7 @@ from app.schemas.agent_api import ( CreateAgentSessionResponse, ResetAgentSessionResponse, ) -from app.modules.agent_api.application.session_service import SessionService +from app.modules.api.application.session_service import SessionService class SessionController: diff --git a/src/app/modules/agent_api/controllers/stream_controller.py b/src/app/modules/api/controllers/stream_controller.py similarity index 93% rename from src/app/modules/agent_api/controllers/stream_controller.py rename to src/app/modules/api/controllers/stream_controller.py index 9e18c58..1026e9d 100644 --- a/src/app/modules/agent_api/controllers/stream_controller.py +++ b/src/app/modules/api/controllers/stream_controller.py @@ -2,7 +2,7 @@ from __future__ import annotations from fastapi.responses import StreamingResponse -from app.modules.agent_api.application.stream_service import StreamService +from app.modules.api.application.stream_service import StreamService class StreamController: diff --git a/src/app/modules/agent_api/domain/events/client_event.py b/src/app/modules/api/domain/events/client_event.py similarity index 100% rename from src/app/modules/agent_api/domain/events/client_event.py rename to src/app/modules/api/domain/events/client_event.py diff --git a/src/app/modules/agent_api/domain/models/agent_request.py b/src/app/modules/api/domain/models/agent_request.py similarity index 100% rename from src/app/modules/agent_api/domain/models/agent_request.py rename to src/app/modules/api/domain/models/agent_request.py diff --git a/src/app/modules/agent_api/domain/models/agent_session.py b/src/app/modules/api/domain/models/agent_session.py similarity index 100% rename from src/app/modules/agent_api/domain/models/agent_session.py rename to src/app/modules/api/domain/models/agent_session.py diff --git a/src/app/modules/agent_api/infrastructure/ids/request_id_factory.py b/src/app/modules/api/infrastructure/ids/request_id_factory.py similarity index 100% rename from src/app/modules/agent_api/infrastructure/ids/request_id_factory.py rename to src/app/modules/api/infrastructure/ids/request_id_factory.py diff --git a/src/app/modules/agent_api/infrastructure/ids/session_id_factory.py b/src/app/modules/api/infrastructure/ids/session_id_factory.py similarity index 100% rename from src/app/modules/agent_api/infrastructure/ids/session_id_factory.py rename to src/app/modules/api/infrastructure/ids/session_id_factory.py diff --git a/src/app/modules/agent_api/infrastructure/logging/request_trace_logger.py b/src/app/modules/api/infrastructure/logging/request_trace_logger.py similarity index 77% rename from src/app/modules/agent_api/infrastructure/logging/request_trace_logger.py rename to src/app/modules/api/infrastructure/logging/request_trace_logger.py index 2be1fd9..11001c9 100644 --- a/src/app/modules/agent_api/infrastructure/logging/request_trace_logger.py +++ b/src/app/modules/api/infrastructure/logging/request_trace_logger.py @@ -3,11 +3,11 @@ from __future__ import annotations from pathlib import Path from threading import Lock -from app.modules.agent_api.domain.events.client_event import ClientEventRecord -from app.modules.agent_api.domain.models.agent_request import AgentRequest -from app.modules.agent_api.domain.models.agent_session import AgentSession -from app.modules.agent_api.infrastructure.logging.trace_file_path_builder import TraceFilePathBuilder -from app.modules.agent_api.infrastructure.logging.trace_markdown_writer import TraceMarkdownWriter +from app.modules.api.domain.events.client_event import ClientEventRecord +from app.modules.api.domain.models.agent_request import AgentRequest +from app.modules.api.domain.models.agent_session import AgentSession +from app.modules.api.infrastructure.logging.trace_file_path_builder import TraceFilePathBuilder +from app.modules.api.infrastructure.logging.trace_markdown_writer import TraceMarkdownWriter class RequestTraceLogger: @@ -39,11 +39,17 @@ class RequestTraceLogger: def log_step(self, request_id: str, step: str, status: str, details: dict | None = None) -> None: self._append(request_id, f"Step {step}", {"status": status, "details": details or {}}) + def log_module(self, request_id: str, module: str, title: str, payload: dict | None = None) -> None: + body = {"event": title} + body.update(payload or {}) + self._append(request_id, module, body) + def log_event(self, event: ClientEventRecord) -> None: self._append( event.request_id, - f"Event {event.type.value}", + "client_event", { + "event": event.type.value, "source": event.source, "text": event.text, "payload": event.payload, @@ -54,7 +60,7 @@ class RequestTraceLogger: def complete_request(self, request: AgentRequest) -> None: self._append( request.request_id, - "Result", + "result", { "status": request.status.value, "answer": request.answer, @@ -65,7 +71,7 @@ class RequestTraceLogger: def fail_request(self, request: AgentRequest) -> None: self._append( request.request_id, - "Error", + "result", { "status": request.status.value, "error": request.error.model_dump(mode="json") if request.error else None, diff --git a/src/app/modules/agent_api/infrastructure/logging/trace_file_path_builder.py b/src/app/modules/api/infrastructure/logging/trace_file_path_builder.py similarity index 100% rename from src/app/modules/agent_api/infrastructure/logging/trace_file_path_builder.py rename to src/app/modules/api/infrastructure/logging/trace_file_path_builder.py diff --git a/src/app/modules/agent_api/infrastructure/logging/trace_markdown_writer.py b/src/app/modules/api/infrastructure/logging/trace_markdown_writer.py similarity index 100% rename from src/app/modules/agent_api/infrastructure/logging/trace_markdown_writer.py rename to src/app/modules/api/infrastructure/logging/trace_markdown_writer.py diff --git a/src/app/modules/agent_api/infrastructure/stores/in_memory_request_store.py b/src/app/modules/api/infrastructure/stores/in_memory_request_store.py similarity index 86% rename from src/app/modules/agent_api/infrastructure/stores/in_memory_request_store.py rename to src/app/modules/api/infrastructure/stores/in_memory_request_store.py index 216ca2f..5551330 100644 --- a/src/app/modules/agent_api/infrastructure/stores/in_memory_request_store.py +++ b/src/app/modules/api/infrastructure/stores/in_memory_request_store.py @@ -2,7 +2,7 @@ from __future__ import annotations from threading import Lock -from app.modules.agent_api.domain.models.agent_request import AgentRequest +from app.modules.api.domain.models.agent_request import AgentRequest class InMemoryRequestStore: diff --git a/src/app/modules/agent_api/infrastructure/stores/in_memory_session_store.py b/src/app/modules/api/infrastructure/stores/in_memory_session_store.py similarity index 86% rename from src/app/modules/agent_api/infrastructure/stores/in_memory_session_store.py rename to src/app/modules/api/infrastructure/stores/in_memory_session_store.py index c66109e..1e13564 100644 --- a/src/app/modules/agent_api/infrastructure/stores/in_memory_session_store.py +++ b/src/app/modules/api/infrastructure/stores/in_memory_session_store.py @@ -2,7 +2,7 @@ from __future__ import annotations from threading import Lock -from app.modules.agent_api.domain.models.agent_session import AgentSession +from app.modules.api.domain.models.agent_session import AgentSession class InMemorySessionStore: diff --git a/src/app/modules/agent_api/infrastructure/streaming/replay_buffer.py b/src/app/modules/api/infrastructure/streaming/replay_buffer.py similarity index 88% rename from src/app/modules/agent_api/infrastructure/streaming/replay_buffer.py rename to src/app/modules/api/infrastructure/streaming/replay_buffer.py index b3b27d5..dad9125 100644 --- a/src/app/modules/agent_api/infrastructure/streaming/replay_buffer.py +++ b/src/app/modules/api/infrastructure/streaming/replay_buffer.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections import defaultdict -from app.modules.agent_api.domain.events.client_event import ClientEventRecord +from app.modules.api.domain.events.client_event import ClientEventRecord class ReplayBuffer: diff --git a/src/app/modules/agent_api/infrastructure/streaming/sse_encoder.py b/src/app/modules/api/infrastructure/streaming/sse_encoder.py similarity index 87% rename from src/app/modules/agent_api/infrastructure/streaming/sse_encoder.py rename to src/app/modules/api/infrastructure/streaming/sse_encoder.py index 4d88d91..0817394 100644 --- a/src/app/modules/agent_api/infrastructure/streaming/sse_encoder.py +++ b/src/app/modules/api/infrastructure/streaming/sse_encoder.py @@ -2,7 +2,7 @@ from __future__ import annotations import json -from app.modules.agent_api.domain.events.client_event import ClientEventRecord +from app.modules.api.domain.events.client_event import ClientEventRecord class SseEncoder: diff --git a/src/app/modules/agent_api/infrastructure/streaming/sse_event_channel.py b/src/app/modules/api/infrastructure/streaming/sse_event_channel.py similarity index 90% rename from src/app/modules/agent_api/infrastructure/streaming/sse_event_channel.py rename to src/app/modules/api/infrastructure/streaming/sse_event_channel.py index 684f5b8..dddfd56 100644 --- a/src/app/modules/agent_api/infrastructure/streaming/sse_event_channel.py +++ b/src/app/modules/api/infrastructure/streaming/sse_event_channel.py @@ -3,8 +3,8 @@ from __future__ import annotations import asyncio from collections import defaultdict -from app.modules.agent_api.domain.events.client_event import ClientEventRecord -from app.modules.agent_api.infrastructure.streaming.replay_buffer import ReplayBuffer +from app.modules.api.domain.events.client_event import ClientEventRecord +from app.modules.api.infrastructure.streaming.replay_buffer import ReplayBuffer class SseEventChannel: diff --git a/src/app/modules/agent_api/module.py b/src/app/modules/api/module.py similarity index 50% rename from src/app/modules/agent_api/module.py rename to src/app/modules/api/module.py index 179bf0b..9085af9 100644 --- a/src/app/modules/agent_api/module.py +++ b/src/app/modules/api/module.py @@ -2,16 +2,16 @@ from __future__ import annotations from fastapi import APIRouter -from app.modules.agent_api.application.request_service import RequestService -from app.modules.agent_api.application.session_service import SessionService -from app.modules.agent_api.application.stream_service import StreamService -from app.modules.agent_api.controllers.request_controller import RequestController -from app.modules.agent_api.controllers.session_controller import SessionController -from app.modules.agent_api.controllers.stream_controller import StreamController -from app.modules.agent_api.public_router import build_public_router +from app.modules.api.application.request_service import RequestService +from app.modules.api.application.session_service import SessionService +from app.modules.api.application.stream_service import StreamService +from app.modules.api.controllers.request_controller import RequestController +from app.modules.api.controllers.session_controller import SessionController +from app.modules.api.controllers.stream_controller import StreamController +from app.modules.api.public_router import build_public_router -class AgentApiModule: +class ApiModule: def __init__( self, sessions: SessionService, diff --git a/src/app/modules/agent_api/public_router.py b/src/app/modules/api/public_router.py similarity index 87% rename from src/app/modules/agent_api/public_router.py rename to src/app/modules/api/public_router.py index 17ce058..0bfd27c 100644 --- a/src/app/modules/agent_api/public_router.py +++ b/src/app/modules/api/public_router.py @@ -2,9 +2,9 @@ from __future__ import annotations from fastapi import APIRouter -from app.modules.agent_api.controllers.request_controller import RequestController -from app.modules.agent_api.controllers.session_controller import SessionController -from app.modules.agent_api.controllers.stream_controller import StreamController +from app.modules.api.controllers.request_controller import RequestController +from app.modules.api.controllers.session_controller import SessionController +from app.modules.api.controllers.stream_controller import StreamController from app.schemas.agent_api import ( AgentRequestCreateRequest, AgentRequestQueuedResponse, diff --git a/src/app/modules/application.py b/src/app/modules/application.py index 0018fd9..4d3f035 100644 --- a/src/app/modules/application.py +++ b/src/app/modules/application.py @@ -13,32 +13,36 @@ from app.modules.agent.task_runtime.workflows import ( OpenApiWorkflow, ) from app.modules.agent.task_runtime.workflows.general_qa import GeneralQaWorkflow -from app.modules.agent_api import AgentApiModule -from app.modules.agent_api.application.request_service import RequestService -from app.modules.agent_api.application.session_service import SessionService -from app.modules.agent_api.application.stream_service import StreamService -from app.modules.agent_api.infrastructure.ids.request_id_factory import RequestIdFactory -from app.modules.agent_api.infrastructure.ids.session_id_factory import SessionIdFactory -from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger -from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore -from app.modules.agent_api.infrastructure.stores.in_memory_session_store import InMemorySessionStore -from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel -from app.modules.orchestration import OrchestrationFacade -from app.modules.orchestration.adapters.intent_router_adapter import IntentRouterAdapter -from app.modules.orchestration.adapters.llm_chat_adapter import LlmChatAdapter -from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher -from app.modules.orchestration.processes.registry import ProcessRegistry -from app.modules.orchestration.processes.v1.process import V1Process -from app.modules.orchestration.processes.v1.steps.bootstrap_step import BootstrapStep -from app.modules.orchestration.processes.v1.steps.finalize_step import FinalizeStep -from app.modules.orchestration.processes.v1.steps.run_llm_step import RunLlmStep -from app.modules.orchestration.processes.v2.process import V2Process -from app.modules.orchestration.processes.v2.steps.execute_documentation_workflow_step import ExecuteDocumentationWorkflowStep -from app.modules.orchestration.processes.v2.steps.execute_fallback_workflow_step import ExecuteFallbackWorkflowStep -from app.modules.orchestration.processes.v2.steps.execute_general_qa_workflow_step import ExecuteGeneralQaWorkflowStep -from app.modules.orchestration.processes.v2.steps.execute_openapi_workflow_step import ExecuteOpenApiWorkflowStep -from app.modules.orchestration.processes.v2.steps.route_intent_step import RouteIntentStep -from app.modules.orchestration.runtime.process_runner import ProcessRunner +from app.modules.api.module import ApiModule +from app.modules.api.application.request_service import RequestService +from app.modules.api.application.session_service import SessionService +from app.modules.api.application.stream_service import StreamService +from app.modules.api.infrastructure.ids.request_id_factory import RequestIdFactory +from app.modules.api.infrastructure.ids.session_id_factory import SessionIdFactory +from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger +from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore +from app.modules.api.infrastructure.stores.in_memory_session_store import InMemorySessionStore +from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel +from app.modules.agent.orchestration.facade import OrchestrationFacade +from app.modules.agent.orchestration.adapters.intent_router_adapter import IntentRouterAdapter +from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter +from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher +from app.modules.agent.orchestration.processes.registry import ProcessRegistry +from app.modules.agent.orchestration.processes.v1.process import V1Process +from app.modules.agent.orchestration.processes.v1.prompt_payload_builder import V1PromptPayloadBuilder +from app.modules.agent.orchestration.processes.v1.simple_llm_workflow import SimpleLlmWorkflow +from app.modules.agent.orchestration.processes.v1.steps.bootstrap_step import BootstrapStep +from app.modules.agent.orchestration.processes.v1.steps.execute_llm_workflow_step import ExecuteLlmWorkflowStep +from app.modules.agent.orchestration.processes.v1.steps.finalize_step import FinalizeStep +from app.modules.agent.orchestration.processes.v2.prompt_payload_builder import V2PromptPayloadBuilder +from app.modules.agent.orchestration.processes.v2.prompt_selector import V2PromptSelector +from app.modules.agent.orchestration.processes.v2.process import V2Process +from app.modules.agent.orchestration.processes.v2.steps.execute_documentation_workflow_step import ExecuteDocumentationWorkflowStep +from app.modules.agent.orchestration.processes.v2.steps.execute_fallback_workflow_step import ExecuteFallbackWorkflowStep +from app.modules.agent.orchestration.processes.v2.steps.execute_general_qa_workflow_step import ExecuteGeneralQaWorkflowStep +from app.modules.agent.orchestration.processes.v2.steps.execute_openapi_workflow_step import ExecuteOpenApiWorkflowStep +from app.modules.agent.orchestration.processes.v2.steps.route_intent_step import RouteIntentStep +from app.modules.agent.orchestration.runtime.process_runner import ProcessRunner from app.modules.rag.persistence.repository import RagRepository from app.modules.agent.runtime.story_context_repository import StoryContextRepository, StoryContextSchemaRepository from app.modules.rag.module import RagModule, RagRepoModule @@ -66,8 +70,10 @@ class ModularApplication: _giga_settings = GigaChatSettings.from_env() _giga_client = GigaChatClient(_giga_settings, GigaChatTokenProvider(_giga_settings)) - _prompt_loader = PromptLoader() - self._agent_llm = AgentLlmService(client=_giga_client, prompts=_prompt_loader) + _v1_prompt_loader = PromptLoader(Path(__file__).resolve().parent / "agent/orchestration/processes/v1/prompts.yml") + _v2_prompt_loader = PromptLoader(Path(__file__).resolve().parent / "agent/orchestration/processes/v2/prompts.yml") + self._agent_llm_v1 = AgentLlmService(client=_giga_client, prompts=_v1_prompt_loader) + self._agent_llm_v2 = AgentLlmService(client=_giga_client, prompts=_v2_prompt_loader) _router = IntentRouterV2() _retrieval = RuntimeRetrievalAdapter(self.rag_repository) _repo_context_factory = RuntimeRepoContextFactory() @@ -75,15 +81,17 @@ class ModularApplication: router=_router, retrieval_adapter=_retrieval, repo_context=_repo_context_factory.build(), - llm=self._agent_llm, + llm=self._agent_llm_v2, + prompt_selector=V2PromptSelector(), + prompt_payload_builder=V2PromptPayloadBuilder(), ) _task_context_builder = TaskRuntimeContextBuilder(_repo_context_factory) _context_enrichment = ContextEnrichmentService() _docs_workflow = DocsQaWorkflow(_docs_runner) _openapi_workflow = OpenApiWorkflow(_docs_runner) _general_qa_workflow = GeneralQaWorkflow(_docs_runner) - _fallback_workflow = FallbackWorkflow(self._agent_llm) - _docs_generation_workflow = DocumentationGenerationWorkflow(self._agent_llm, DocumentationTemplateRegistry()) + _fallback_workflow = FallbackWorkflow(self._agent_llm_v2) + _docs_generation_workflow = DocumentationGenerationWorkflow(self._agent_llm_v2, DocumentationTemplateRegistry()) self._docs_generation_workflow = _docs_generation_workflow self.agent_sessions = InMemorySessionStore() @@ -91,8 +99,12 @@ class ModularApplication: self.agent_events = SseEventChannel() self.agent_trace_logger = RequestTraceLogger(Path("runtime_traces/agent_requests")) _publisher = ClientMessagePublisher(self.agent_events, self.agent_trace_logger) + _v1_workflow = SimpleLlmWorkflow( + LlmChatAdapter(self._agent_llm_v1, prompt_name="simple_llm_answer"), + V1PromptPayloadBuilder(), + ) _process_registry = ProcessRegistry( - V1Process([BootstrapStep(), RunLlmStep(LlmChatAdapter(self._agent_llm)), FinalizeStep()]), + V1Process([BootstrapStep(), ExecuteLlmWorkflowStep(_v1_workflow), FinalizeStep()]), V2Process( [ BootstrapStep(), @@ -123,7 +135,7 @@ class ModularApplication: sessions=_session_service, orchestration=_orchestration, ) - self.agent_api = AgentApiModule( + self.api = ApiModule( sessions=_session_service, requests=_request_service, streams=StreamService(self.agent_events, request_exists=lambda request_id: self.agent_requests.get(request_id) is not None), diff --git a/src/app/modules/chat/README.md b/src/app/modules/chat/README.md deleted file mode 100644 index c6edc18..0000000 --- a/src/app/modules/chat/README.md +++ /dev/null @@ -1,98 +0,0 @@ -# Модуль chat - -## 1. Функции модуля -- Внешний API чата: создание диалога, отправка сообщения, получение статуса задачи. -- Асинхронная оркестрация выполнения через `ChatOrchestrator`. -- Idempotency и стриминг событий по SSE. - -## 2. Диаграмма классов и взаимосвязей -```mermaid -classDiagram - class ChatModule - class ChatOrchestrator - class TaskStore - class DialogSessionStore - class IdempotencyStore - class EventBus - class AgentRunner - - ChatModule --> ChatOrchestrator - ChatModule --> TaskStore - ChatModule --> DialogSessionStore - ChatModule --> IdempotencyStore - ChatModule --> EventBus - ChatOrchestrator --> AgentRunner - ChatOrchestrator --> TaskStore - ChatOrchestrator --> DialogSessionStore - ChatOrchestrator --> EventBus -``` - -## 3. Описание классов -- `ChatModule`: фасад модуля и регистрация публичных chat endpoint'ов. - Методы: `__init__` — собирает stores/orchestrator; `public_router` — публикует REST и SSE маршруты чата. -- `ChatOrchestrator`: выполняет жизненный цикл user-message как фоновой задачи. - Методы: `enqueue_message` — создает задачу и запускает обработку; `_process_task` — исполняет runtime и сохраняет результат; `_resolve_sessions` — валидирует и сопоставляет dialog/rag сессии. -- `TaskStore`: in-memory store состояний задач. - Методы: `create` — создает новую `TaskState`; `get` — возвращает задачу по `task_id`; `save` — обновляет состояние задачи. -- `DialogSessionStore`: хранилище dialog-сессий поверх БД. - Методы: `create` — создает новую dialog-сессию; `get` — читает dialog-сессию по id. -- `IdempotencyStore`: предотвращает дубль задач по идемпотентному ключу. - Методы: `get_task_id` — возвращает существующий `task_id` по ключу; `put` — сохраняет ключ и `task_id`. -- `EventBus`: асинхронная публикация/подписка событий. - Методы: `subscribe` — создает подписку на канал; `unsubscribe` — снимает подписку; `publish` — отправляет событие подписчикам; `as_sse` — сериализует событие в SSE формат. -- `AgentRunner` (контракт): интерфейс выполнения агентного запроса из chat-слоя. - Методы: `run` — принимает данные задачи и возвращает итог `answer`/`changeset`. - -## 4. Сиквенс-диаграммы API - -### POST /api/chat/dialogs -Назначение: создает новый диалог, связанный с существующей `rag_session`, чтобы пользователь мог отправлять сообщения в контексте конкретного индекса. -```mermaid -sequenceDiagram - participant Router as ChatModule.APIRouter - participant RagSessions as RagSessionStore - participant Dialogs as DialogSessionStore - - Router->>RagSessions: get(rag_session_id) - RagSessions-->>Router: exists - Router->>Dialogs: create(rag_session_id) - Dialogs-->>Router: dialog_session -``` - -### POST /api/chat/messages -Назначение: ставит сообщение пользователя в асинхронную обработку и возвращает `task_id` для отслеживания результата. -```mermaid -sequenceDiagram - participant Router as ChatModule.APIRouter - participant Orchestrator as ChatOrchestrator - participant TaskStore as TaskStore - - Router->>Orchestrator: enqueue_message(request, idempotency_key) - Orchestrator->>TaskStore: create()/save() - Orchestrator-->>Router: task_id,status -``` - -### GET /api/tasks/{task_id} -Назначение: отдает текущее состояние задачи и финальный результат (answer/changeset/error), когда обработка завершена. -```mermaid -sequenceDiagram - participant Router as ChatModule.APIRouter - participant TaskStore as TaskStore - - Router->>TaskStore: get(task_id) - TaskStore-->>Router: task_state -``` - -### GET /api/events?task_id=... -Назначение: открывает SSE-поток с прогрессом выполнения задачи и промежуточными событиями. -```mermaid -sequenceDiagram - participant Router as ChatModule.APIRouter - participant Events as EventBus - - Router->>Events: subscribe(task_id) - loop until disconnect - Events-->>Router: SSE event - end - Router->>Events: unsubscribe(task_id) -``` diff --git a/src/app/modules/chat/__init__.py b/src/app/modules/chat/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/app/modules/chat/dialog_store.py b/src/app/modules/chat/dialog_store.py deleted file mode 100644 index ea8f932..0000000 --- a/src/app/modules/chat/dialog_store.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import TYPE_CHECKING -from uuid import uuid4 - -if TYPE_CHECKING: - from app.modules.chat.repository import ChatRepository - - -@dataclass -class DialogSession: - dialog_session_id: str - rag_session_id: str - - -class DialogSessionStore: - def __init__(self, repository: ChatRepository) -> None: - self._repo = repository - - def create(self, rag_session_id: str) -> DialogSession: - session = DialogSession(dialog_session_id=str(uuid4()), rag_session_id=rag_session_id) - self._repo.create_dialog(session.dialog_session_id, session.rag_session_id) - return session - - def get(self, dialog_session_id: str) -> DialogSession | None: - row = self._repo.get_dialog(dialog_session_id) - if not row: - return None - return DialogSession( - dialog_session_id=str(row["dialog_session_id"]), - rag_session_id=str(row["rag_session_id"]), - ) diff --git a/src/app/modules/chat/direct_service.py b/src/app/modules/chat/direct_service.py deleted file mode 100644 index 0ab0a2a..0000000 --- a/src/app/modules/chat/direct_service.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -import logging -from uuid import uuid4 - -from app.modules.agent.llm import AgentLlmService -from app.modules.chat.evidence_gate import CodeExplainEvidenceGate -from app.modules.chat.session_resolver import ChatSessionResolver -from app.modules.chat.task_store import TaskState, TaskStore -from app.modules.agent.runtime.steps.explain import CodeExplainRetrieverV2, PromptBudgeter -from app.schemas.chat import ChatMessageRequest, TaskQueuedResponse, TaskResultType, TaskStatus - -LOGGER = logging.getLogger(__name__) - - -class CodeExplainChatService: - def __init__( - self, - retriever: CodeExplainRetrieverV2, - llm: AgentLlmService, - session_resolver: ChatSessionResolver, - task_store: TaskStore, - message_sink, - budgeter: PromptBudgeter | None = None, - evidence_gate: CodeExplainEvidenceGate | None = None, - ) -> None: - self._retriever = retriever - self._llm = llm - self._session_resolver = session_resolver - self._task_store = task_store - self._message_sink = message_sink - self._budgeter = budgeter or PromptBudgeter() - self._evidence_gate = evidence_gate or CodeExplainEvidenceGate() - - async def handle_message(self, request: ChatMessageRequest) -> TaskQueuedResponse: - dialog_session_id, rag_session_id = self._session_resolver.resolve(request) - task_id = str(uuid4()) - task = TaskState(task_id=task_id, status=TaskStatus.RUNNING) - self._task_store.save(task) - self._message_sink(dialog_session_id, "user", request.message, task_id=task_id) - pack = self._retriever.build_pack( - rag_session_id, - request.message, - file_candidates=[item.model_dump(mode="json") for item in request.files], - ) - decision = self._evidence_gate.evaluate(pack) - if decision.passed: - prompt_input = self._budgeter.build_prompt_input(request.message, pack) - answer = self._llm.generate( - "code_explain_answer_v2", - prompt_input, - log_context="chat.code_explain.direct", - ).strip() - else: - answer = decision.answer - self._message_sink(dialog_session_id, "assistant", answer, task_id=task_id) - task.status = TaskStatus.DONE - task.result_type = TaskResultType.ANSWER - task.answer = answer - self._task_store.save(task) - LOGGER.warning( - "direct code explain response: task_id=%s rag_session_id=%s excerpts=%s missing=%s", - task_id, - rag_session_id, - len(pack.code_excerpts), - pack.missing, - ) - return TaskQueuedResponse( - task_id=task_id, - status=TaskStatus.DONE.value, - ) diff --git a/src/app/modules/chat/evidence_gate.py b/src/app/modules/chat/evidence_gate.py deleted file mode 100644 index 3e75f6c..0000000 --- a/src/app/modules/chat/evidence_gate.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass, field - -from app.modules.agent.runtime.steps.explain.models import ExplainPack - - -@dataclass(slots=True) -class EvidenceGateDecision: - passed: bool - answer: str = "" - diagnostics: dict[str, list[str]] = field(default_factory=dict) - - -class CodeExplainEvidenceGate: - def __init__(self, min_excerpts: int = 2) -> None: - self._min_excerpts = min_excerpts - - def evaluate(self, pack: ExplainPack) -> EvidenceGateDecision: - diagnostics = self._diagnostics(pack) - if len(pack.code_excerpts) >= self._min_excerpts: - return EvidenceGateDecision(passed=True, diagnostics=diagnostics) - return EvidenceGateDecision( - passed=False, - answer=self._build_answer(pack, diagnostics), - diagnostics=diagnostics, - ) - - def _diagnostics(self, pack: ExplainPack) -> dict[str, list[str]]: - return { - "entrypoints": [item.title for item in pack.selected_entrypoints[:3] if item.title], - "symbols": [item.title for item in pack.seed_symbols[:5] if item.title], - "paths": self._paths(pack), - "missing": list(pack.missing), - } - - def _paths(self, pack: ExplainPack) -> list[str]: - values: list[str] = [] - for item in pack.selected_entrypoints + pack.seed_symbols: - path = item.source or (item.location.path if item.location else "") - if path and path not in values: - values.append(path) - for excerpt in pack.code_excerpts: - if excerpt.path and excerpt.path not in values: - values.append(excerpt.path) - return values[:6] - - def _build_answer(self, pack: ExplainPack, diagnostics: dict[str, list[str]]) -> str: - lines = [ - "Недостаточно опоры в коде, чтобы дать объяснение без догадок.", - "", - f"Найдено фрагментов кода: {len(pack.code_excerpts)} из {self._min_excerpts} минимально необходимых.", - ] - if diagnostics["paths"]: - lines.append(f"Пути: {', '.join(diagnostics['paths'])}") - if diagnostics["entrypoints"]: - lines.append(f"Entrypoints: {', '.join(diagnostics['entrypoints'])}") - if diagnostics["symbols"]: - lines.append(f"Символы: {', '.join(diagnostics['symbols'])}") - if diagnostics["missing"]: - lines.append(f"Диагностика: {', '.join(diagnostics['missing'])}") - return "\n".join(lines).strip() diff --git a/src/app/modules/chat/module.py b/src/app/modules/chat/module.py deleted file mode 100644 index e89b71f..0000000 --- a/src/app/modules/chat/module.py +++ /dev/null @@ -1,112 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from fastapi import APIRouter, Header -from fastapi.responses import StreamingResponse - -from app.core.exceptions import AppError -from app.modules.chat.dialog_store import DialogSessionStore -from app.modules.chat.service import ChatOrchestrator -from app.modules.chat.task_store import TaskStore -from app.modules.shared.event_bus import EventBus -from app.modules.shared.idempotency_store import IdempotencyStore -from app.modules.shared.retry_executor import RetryExecutor -from app.schemas.chat import ( - ChatMessageRequest, - DialogCreateRequest, - DialogCreateResponse, - TaskQueuedResponse, - TaskResultResponse, -) -from app.schemas.common import ModuleName - -if TYPE_CHECKING: - from app.modules.chat.repository import ChatRepository - from app.modules.contracts import AgentRunner - from app.modules.rag.session_store import RagSessionStore - - -class ChatModule: - def __init__( - self, - agent_runner: AgentRunner, - event_bus: EventBus, - retry: RetryExecutor, - rag_sessions: RagSessionStore, - repository: ChatRepository, - task_store: TaskStore | None = None, - ) -> None: - self._rag_sessions = rag_sessions - self.tasks = task_store or TaskStore() - self.dialogs = DialogSessionStore(repository) - self.idempotency = IdempotencyStore() - self.events = event_bus - self.chat = ChatOrchestrator( - task_store=self.tasks, - dialogs=self.dialogs, - idempotency=self.idempotency, - runtime=agent_runner, - events=self.events, - retry=retry, - rag_session_exists=lambda rag_session_id: rag_sessions.get(rag_session_id) is not None, - message_sink=repository.add_message, - ) - - def public_router(self) -> APIRouter: - router = APIRouter(tags=["chat"]) - - @router.post("/api/chat/dialogs", response_model=DialogCreateResponse) - async def create_dialog(request: DialogCreateRequest) -> DialogCreateResponse: - if not self._rag_sessions.get(request.rag_session_id): - raise AppError("rag_session_not_found", "RAG session not found", ModuleName.RAG) - dialog = self.dialogs.create(request.rag_session_id) - return DialogCreateResponse( - dialog_session_id=dialog.dialog_session_id, - rag_session_id=dialog.rag_session_id, - ) - - @router.post("/api/chat/messages", response_model=TaskQueuedResponse | TaskResultResponse) - async def send_message( - request: ChatMessageRequest, - idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"), - ) -> TaskQueuedResponse | TaskResultResponse: - task = await self.chat.enqueue_message(request, idempotency_key) - return TaskQueuedResponse(task_id=task.task_id, status=task.status.value) - - @router.get("/api/tasks/{task_id}", response_model=TaskResultResponse) - async def get_task(task_id: str) -> TaskResultResponse: - task = self.tasks.get(task_id) - if not task: - raise AppError("not_found", f"Task not found: {task_id}", ModuleName.BACKEND) - return TaskResultResponse( - task_id=task.task_id, - status=task.status, - result_type=task.result_type, - answer=task.answer, - artifacts=task.artifacts, - changeset=task.changeset, - error=task.error, - ) - - @router.get("/api/events") - async def stream_events(task_id: str) -> StreamingResponse: - queue = await self.events.subscribe(task_id) - - async def event_stream(): - import asyncio - - heartbeat = 10 - try: - while True: - try: - event = await asyncio.wait_for(queue.get(), timeout=heartbeat) - yield EventBus.as_sse(event) - except asyncio.TimeoutError: - yield ": keepalive\\n\\n" - finally: - await self.events.unsubscribe(task_id, queue) - - return StreamingResponse(event_stream(), media_type="text/event-stream") - - return router diff --git a/src/app/modules/chat/repository.py b/src/app/modules/chat/repository.py deleted file mode 100644 index e78ff9e..0000000 --- a/src/app/modules/chat/repository.py +++ /dev/null @@ -1,93 +0,0 @@ -import json - -from sqlalchemy import text - -from app.modules.shared.db import get_engine - - -class ChatRepository: - def ensure_tables(self) -> None: - with get_engine().connect() as conn: - conn.execute( - text( - """ - CREATE TABLE IF NOT EXISTS dialog_sessions ( - dialog_session_id VARCHAR(64) PRIMARY KEY, - rag_session_id VARCHAR(64) NOT NULL, - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - ) - conn.execute( - text( - """ - CREATE TABLE IF NOT EXISTS chat_messages ( - id BIGSERIAL PRIMARY KEY, - dialog_session_id VARCHAR(64) NOT NULL, - task_id VARCHAR(64), - role VARCHAR(16) NOT NULL, - content TEXT NOT NULL, - payload JSONB, - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - ) - conn.execute(text("ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS task_id VARCHAR(64)")) - conn.execute(text("ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS payload JSONB")) - conn.commit() - - def create_dialog(self, dialog_session_id: str, rag_session_id: str) -> None: - with get_engine().connect() as conn: - conn.execute( - text( - """ - INSERT INTO dialog_sessions (dialog_session_id, rag_session_id) - VALUES (:did, :sid) - """ - ), - {"did": dialog_session_id, "sid": rag_session_id}, - ) - conn.commit() - - def get_dialog(self, dialog_session_id: str) -> dict | None: - with get_engine().connect() as conn: - row = conn.execute( - text( - """ - SELECT dialog_session_id, rag_session_id - FROM dialog_sessions - WHERE dialog_session_id = :did - """ - ), - {"did": dialog_session_id}, - ).mappings().fetchone() - return dict(row) if row else None - - def add_message( - self, - dialog_session_id: str, - role: str, - content: str, - task_id: str | None = None, - payload: dict | None = None, - ) -> None: - payload_json = json.dumps(payload, ensure_ascii=False) if payload is not None else None - with get_engine().connect() as conn: - conn.execute( - text( - """ - INSERT INTO chat_messages (dialog_session_id, task_id, role, content, payload) - VALUES (:did, :task_id, :role, :content, CAST(:payload AS JSONB)) - """ - ), - { - "did": dialog_session_id, - "task_id": task_id, - "role": role, - "content": content, - "payload": payload_json, - }, - ) - conn.commit() diff --git a/src/app/modules/chat/service.py b/src/app/modules/chat/service.py deleted file mode 100644 index 6dd87bf..0000000 --- a/src/app/modules/chat/service.py +++ /dev/null @@ -1,288 +0,0 @@ -import asyncio -import logging - -from app.core.exceptions import AppError -from app.modules.contracts import AgentRunner -from app.schemas.chat import ChatMessageRequest, TaskResultType, TaskStatus -from app.schemas.common import ErrorPayload, ModuleName -from app.modules.chat.dialog_store import DialogSessionStore -from app.modules.chat.session_resolver import ChatSessionResolver -from app.modules.chat.task_store import TaskState, TaskStore -from app.modules.shared.event_bus import EventBus -from app.modules.shared.idempotency_store import IdempotencyStore -from app.modules.shared.retry_executor import RetryExecutor - -LOGGER = logging.getLogger(__name__) - - -def _truncate_for_log(text: str, max_chars: int = 1200) -> str: - value = (text or "").replace("\n", "\\n").strip() - if len(value) <= max_chars: - return value - return value[:max_chars].rstrip() + "...[truncated]" - - -class ChatOrchestrator: - def __init__( - self, - task_store: TaskStore, - dialogs: DialogSessionStore, - idempotency: IdempotencyStore, - runtime: AgentRunner, - events: EventBus, - retry: RetryExecutor, - rag_session_exists, - message_sink, - ) -> None: - self._task_store = task_store - self._dialogs = dialogs - self._idempotency = idempotency - self._runtime = runtime - self._events = events - self._retry = retry - self._rag_session_exists = rag_session_exists - self._message_sink = message_sink - self._session_resolver = ChatSessionResolver(dialogs, rag_session_exists) - - async def enqueue_message( - self, - request: ChatMessageRequest, - idempotency_key: str | None, - ) -> TaskState: - if idempotency_key: - existing = self._idempotency.get_task_id(idempotency_key) - if existing: - task = self._task_store.get(existing) - if task: - LOGGER.info( - "enqueue_message reused task by idempotency key: task_id=%s mode=%s", - task.task_id, - request.mode.value, - ) - return task - - task = self._task_store.create() - if idempotency_key: - self._idempotency.put(idempotency_key, task.task_id) - asyncio.create_task(self._process_task(task.task_id, request)) - LOGGER.info( - "enqueue_message created task: task_id=%s mode=%s", - task.task_id, - request.mode.value, - ) - return task - - async def _process_task(self, task_id: str, request: ChatMessageRequest) -> None: - task = self._task_store.get(task_id) - if not task: - return - task.status = TaskStatus.RUNNING - self._task_store.save(task) - await self._events.publish(task_id, "task_status", {"task_id": task_id, "status": task.status.value}) - await self._publish_progress(task_id, "task.start", "Запрос принят, начинаю обработку.", progress=5) - - heartbeat_stop = asyncio.Event() - heartbeat_task = asyncio.create_task(self._run_heartbeat(task_id, heartbeat_stop)) - - try: - await self._publish_progress(task_id, "task.sessions", "Проверяю сессии диалога и проекта.", progress=10) - dialog_session_id, rag_session_id = self._resolve_sessions(request) - LOGGER.warning( - "incoming chat request: task_id=%s dialog_session_id=%s rag_session_id=%s mode=%s attachments=%s files=%s message=%s", - task_id, - dialog_session_id, - rag_session_id, - request.mode.value, - len(request.attachments), - len(request.files), - _truncate_for_log(request.message), - ) - await self._publish_progress(task_id, "task.sessions.done", "Сессии проверены, запускаю агента.", progress=15) - loop = asyncio.get_running_loop() - - def progress_cb(stage: str, message: str, kind: str = "task_progress", meta: dict | None = None): - asyncio.run_coroutine_threadsafe( - self._events.publish( - task_id, - kind, - { - "task_id": task_id, - "stage": stage, - "message": message, - "meta": meta or {}, - }, - ), - loop, - ) - - async def op(): - self._message_sink(dialog_session_id, "user", request.message, task_id=task_id) - await self._publish_progress(task_id, "task.agent.run", "Агент анализирует запрос и готовит ответ.", progress=20) - return await self._runtime.run( - task_id=task_id, - dialog_session_id=dialog_session_id, - rag_session_id=rag_session_id, - mode=request.mode.value, - message=request.message, - attachments=[a.model_dump(mode="json") for a in request.attachments], - files=[f.model_dump(mode="json") for f in request.files], - progress_cb=progress_cb, - ) - - result = await self._retry.run(op) - await self._publish_progress(task_id, "task.finalize", "Сохраняю финальный результат.", progress=95) - task.status = TaskStatus.DONE - task.result_type = TaskResultType(result.result_type) - task.answer = result.answer - task.artifacts = list(getattr(result, "artifacts", []) or []) - task.changeset = result.changeset - if task.result_type != TaskResultType.CHANGESET and (task.answer or task.artifacts): - payload = { - "result_type": task.result_type.value, - "artifacts": [item.model_dump(mode="json") for item in task.artifacts], - } - self._message_sink(dialog_session_id, "assistant", task.answer or "", task_id=task_id, payload=payload) - LOGGER.warning( - "outgoing chat response: task_id=%s dialog_session_id=%s result_type=%s answer=%s", - task_id, - dialog_session_id, - task.result_type.value, - _truncate_for_log(task.answer or ""), - ) - elif task.result_type == TaskResultType.CHANGESET: - self._message_sink( - dialog_session_id, - "assistant", - f"changeset:{len(task.changeset)}", - task_id=task_id, - payload={ - "result_type": TaskResultType.CHANGESET.value, - "changeset": [item.model_dump(mode="json") for item in task.changeset], - }, - ) - LOGGER.warning( - "outgoing chat response: task_id=%s dialog_session_id=%s result_type=%s changeset_items=%s answer=%s", - task_id, - dialog_session_id, - task.result_type.value, - len(task.changeset), - _truncate_for_log(task.answer or ""), - ) - self._task_store.save(task) - await self._events.publish( - task_id, - "task_result", - { - "task_id": task_id, - "status": task.status.value, - "result_type": task.result_type.value, - "answer": task.answer, - "artifacts": [item.model_dump(mode="json") for item in task.artifacts], - "changeset": [item.model_dump(mode="json") for item in task.changeset], - "meta": getattr(result, "meta", {}) or {}, - }, - ) - await self._publish_progress(task_id, "task.done", "Обработка завершена.", progress=100) - LOGGER.info( - "_process_task completed: task_id=%s status=%s result_type=%s changeset_items=%s", - task_id, - task.status.value, - task.result_type.value if task.result_type else "", - len(task.changeset), - ) - except (AppError, TimeoutError, ConnectionError, OSError) as exc: - task.status = TaskStatus.ERROR - if isinstance(exc, AppError): - payload = ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module) - else: - payload = ErrorPayload( - code="retry_exhausted", - desc="Temporary failure after retries. Please retry request.", - module=ModuleName.BACKEND, - ) - task.error = payload - self._task_store.save(task) - await self._publish_progress(task_id, "task.error", "Не удалось завершить обработку запроса.", kind="task_thinking") - await self._events.publish(task_id, "task_error", payload.model_dump(mode="json")) - LOGGER.warning( - "_process_task handled error: task_id=%s code=%s module=%s desc=%s", - task_id, - payload.code, - payload.module.value, - payload.desc, - ) - except Exception: - task.status = TaskStatus.ERROR - payload = ErrorPayload( - code="agent_runtime_error", - desc="Agent execution failed unexpectedly. Please retry request.", - module=ModuleName.AGENT, - ) - task.error = payload - self._task_store.save(task) - await self._publish_progress( - task_id, - "task.error", - "Во время выполнения возникла внутренняя ошибка.", - kind="task_thinking", - ) - await self._events.publish(task_id, "task_error", payload.model_dump(mode="json")) - LOGGER.exception( - "_process_task unexpected error: task_id=%s code=%s", - task_id, - payload.code, - ) - finally: - heartbeat_stop.set() - await heartbeat_task - - async def _publish_progress( - self, - task_id: str, - stage: str, - message: str, - *, - progress: int | None = None, - kind: str = "task_progress", - meta: dict | None = None, - ) -> None: - payload = { - "task_id": task_id, - "stage": stage, - "message": message, - "meta": meta or {}, - } - if progress is not None: - payload["progress"] = max(0, min(100, int(progress))) - await self._events.publish(task_id, kind, payload) - LOGGER.debug( - "_publish_progress emitted: task_id=%s kind=%s stage=%s progress=%s", - task_id, - kind, - stage, - payload.get("progress"), - ) - - async def _run_heartbeat(self, task_id: str, stop_event: asyncio.Event) -> None: - messages = ( - "Собираю данные по проекту.", - "Анализирую контекст и формирую структуру ответа.", - "Проверяю согласованность промежуточного результата.", - ) - index = 0 - while not stop_event.is_set(): - try: - await asyncio.wait_for(stop_event.wait(), timeout=5.0) - except asyncio.TimeoutError: - await self._publish_progress( - task_id, - "task.heartbeat", - messages[index % len(messages)], - kind="task_thinking", - meta={"heartbeat": True}, - ) - index += 1 - LOGGER.debug("_run_heartbeat stopped: task_id=%s ticks=%s", task_id, index) - - def _resolve_sessions(self, request: ChatMessageRequest) -> tuple[str, str]: - return self._session_resolver.resolve(request) diff --git a/src/app/modules/chat/session_resolver.py b/src/app/modules/chat/session_resolver.py deleted file mode 100644 index 653523b..0000000 --- a/src/app/modules/chat/session_resolver.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from app.core.exceptions import AppError -from app.schemas.chat import ChatMessageRequest -from app.schemas.common import ModuleName - -if TYPE_CHECKING: - from app.modules.chat.dialog_store import DialogSessionStore - - -class ChatSessionResolver: - def __init__(self, dialogs: DialogSessionStore, rag_session_exists) -> None: - self._dialogs = dialogs - self._rag_session_exists = rag_session_exists - - def resolve(self, request: ChatMessageRequest) -> tuple[str, str]: - if request.dialog_session_id and request.rag_session_id: - dialog = self._dialogs.get(request.dialog_session_id) - if not dialog: - raise AppError("dialog_not_found", "Dialog session not found", ModuleName.BACKEND) - if dialog.rag_session_id != request.rag_session_id: - raise AppError("dialog_rag_mismatch", "Dialog session does not belong to rag session", ModuleName.BACKEND) - return request.dialog_session_id, request.rag_session_id - - if request.session_id and request.project_id: - if not self._rag_session_exists(request.project_id): - raise AppError("rag_session_not_found", "RAG session not found", ModuleName.RAG) - return request.session_id, request.project_id - - raise AppError( - "missing_sessions", - "dialog_session_id and rag_session_id are required", - ModuleName.BACKEND, - ) diff --git a/src/app/modules/chat/task_store.py b/src/app/modules/chat/task_store.py deleted file mode 100644 index 4b80247..0000000 --- a/src/app/modules/chat/task_store.py +++ /dev/null @@ -1,38 +0,0 @@ -from dataclasses import dataclass, field -from threading import Lock -from uuid import uuid4 - -from app.schemas.changeset import ChangeItem -from app.schemas.chat import TaskArtifact, TaskResultType, TaskStatus -from app.schemas.common import ErrorPayload - - -@dataclass -class TaskState: - task_id: str - status: TaskStatus = TaskStatus.QUEUED - result_type: TaskResultType | None = None - answer: str | None = None - artifacts: list[TaskArtifact] = field(default_factory=list) - changeset: list[ChangeItem] = field(default_factory=list) - error: ErrorPayload | None = None - - -class TaskStore: - def __init__(self) -> None: - self._items: dict[str, TaskState] = {} - self._lock = Lock() - - def create(self) -> TaskState: - task = TaskState(task_id=str(uuid4())) - with self._lock: - self._items[task.task_id] = task - return task - - def get(self, task_id: str) -> TaskState | None: - with self._lock: - return self._items.get(task_id) - - def save(self, task: TaskState) -> None: - with self._lock: - self._items[task.task_id] = task diff --git a/src/app/modules/orchestration/__init__.py b/src/app/modules/orchestration/__init__.py deleted file mode 100644 index 1c5f050..0000000 --- a/src/app/modules/orchestration/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from app.modules.orchestration.facade import OrchestrationFacade - -__all__ = ["OrchestrationFacade"] diff --git a/src/app/modules/orchestration/adapters/intent_router_adapter.py b/src/app/modules/orchestration/adapters/intent_router_adapter.py deleted file mode 100644 index c20ae83..0000000 --- a/src/app/modules/orchestration/adapters/intent_router_adapter.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import annotations - -from app.modules.agent.intent_router_v2 import IntentRouterV2 - - -class IntentRouterAdapter: - def __init__(self, router: IntentRouterV2) -> None: - self._router = router - - def route(self, user_query: str, conversation_state, repo_context): - return self._router.route(user_query, conversation_state, repo_context) diff --git a/src/app/modules/orchestration/adapters/llm_chat_adapter.py b/src/app/modules/orchestration/adapters/llm_chat_adapter.py deleted file mode 100644 index f80ddc9..0000000 --- a/src/app/modules/orchestration/adapters/llm_chat_adapter.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import annotations - -import asyncio - -from app.modules.agent.llm.service import AgentLlmService - - -class LlmChatAdapter: - def __init__(self, llm: AgentLlmService, prompt_name: str = "agent_api_v1") -> None: - self._llm = llm - self._prompt_name = prompt_name - - async def generate(self, message: str, request_id: str) -> str: - return await asyncio.to_thread( - self._llm.generate, - self._prompt_name, - message, - log_context=f"agent_api:{request_id}", - ) diff --git a/src/app/modules/orchestration/context/execution_context.py b/src/app/modules/orchestration/context/execution_context.py deleted file mode 100644 index 91ea2e7..0000000 --- a/src/app/modules/orchestration/context/execution_context.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -from app.modules.agent_api.domain.models.agent_request import AgentRequest -from app.modules.agent_api.domain.models.agent_session import AgentSession -from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger -from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher - - -@dataclass(slots=True) -class ExecutionContext: - request: AgentRequest - session: AgentSession - publisher: ClientMessagePublisher - trace_logger: RequestTraceLogger - task_context: Any = None - route_result: Any = None - workflow_result: Any = None diff --git a/src/app/schemas/agent_api.py b/src/app/schemas/agent_api.py index dc460af..a125282 100644 --- a/src/app/schemas/agent_api.py +++ b/src/app/schemas/agent_api.py @@ -33,7 +33,7 @@ class ResetAgentSessionResponse(BaseModel): class AgentRequestCreateRequest(BaseModel): session_id: str = Field(min_length=1) message: str = Field(min_length=1) - process_version: str = Field(default="v2", min_length=1) + process_version: str = Field(default="v1", min_length=1) class AgentRequestQueuedResponse(BaseModel): diff --git a/tests/unit_tests/chat/test_chat_api_simple_code_explain.py b/tests/unit_tests/chat/test_chat_api_simple_code_explain.py deleted file mode 100644 index bb7afad..0000000 --- a/tests/unit_tests/chat/test_chat_api_simple_code_explain.py +++ /dev/null @@ -1,70 +0,0 @@ -import asyncio - -from app.modules.chat.module import ChatModule -from app.modules.chat.task_store import TaskStore -from app.schemas.chat import ChatMessageRequest -from app.schemas.chat import TaskQueuedResponse -from app.modules.shared.event_bus import EventBus -from app.modules.shared.retry_executor import RetryExecutor - - -class _FakeRuntime: - async def run(self, **kwargs): - raise AssertionError("legacy runtime must not be called") - - -class _FakeDirectChat: - def __init__(self) -> None: - self.calls = 0 - - async def handle_message(self, request): - self.calls += 1 - return TaskQueuedResponse( - task_id="task-1", - status="done", - ) - - -class _FakeRagSessions: - def get(self, rag_session_id: str): - return {"rag_session_id": rag_session_id} - - -class _FakeRepository: - def create_dialog(self, dialog_session_id: str, rag_session_id: str) -> None: - return None - - def get_dialog(self, dialog_session_id: str): - return None - - def add_message(self, dialog_session_id: str, role: str, content: str, task_id: str | None = None, payload: dict | None = None) -> None: - return None - - -def test_chat_messages_endpoint_uses_direct_service(monkeypatch) -> None: - monkeypatch.setenv("SIMPLE_CODE_EXPLAIN_ONLY", "true") - direct_chat = _FakeDirectChat() - module = ChatModule( - agent_runner=_FakeRuntime(), - event_bus=EventBus(), - retry=RetryExecutor(), - rag_sessions=_FakeRagSessions(), - repository=_FakeRepository(), - direct_chat=direct_chat, - task_store=TaskStore(), - ) - router = module.public_router() - endpoint = next(route.endpoint for route in router.routes if getattr(route, "path", "") == "/api/chat/messages") - response = asyncio.run( - endpoint( - ChatMessageRequest( - session_id="dialog-1", - project_id="rag-1", - message="Explain get_user", - ), - None, - ) - ) - - assert response.task_id == "task-1" - assert direct_chat.calls == 1 diff --git a/tests/unit_tests/chat/test_direct_service.py b/tests/unit_tests/chat/test_direct_service.py deleted file mode 100644 index f7b9141..0000000 --- a/tests/unit_tests/chat/test_direct_service.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio - -from app.modules.chat.direct_service import CodeExplainChatService -from app.modules.chat.session_resolver import ChatSessionResolver -from app.modules.chat.task_store import TaskStore -from app.modules.agent.runtime.steps.explain.models import ExplainIntent, ExplainPack -from app.schemas.chat import ChatFileContext, ChatMessageRequest - - -class _FakeRetriever: - def build_pack(self, rag_session_id: str, user_query: str, *, file_candidates: list[dict] | None = None) -> ExplainPack: - return ExplainPack( - intent=ExplainIntent(raw_query=user_query, normalized_query=user_query), - missing=["code_excerpts"], - ) - - -class _FakeLlm: - def __init__(self) -> None: - self.calls = 0 - - def generate(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> str: - self.calls += 1 - return "should not be called" - - -class _FakeDialogs: - def get(self, dialog_session_id: str): - return None - - -def test_direct_service_skips_llm_when_evidence_is_insufficient() -> None: - messages: list[tuple[str, str, str, str | None]] = [] - llm = _FakeLlm() - task_store = TaskStore() - service = CodeExplainChatService( - retriever=_FakeRetriever(), - llm=llm, - session_resolver=ChatSessionResolver(_FakeDialogs(), lambda rag_session_id: rag_session_id == "rag-1"), - task_store=task_store, - message_sink=lambda dialog_session_id, role, content, task_id=None: messages.append((dialog_session_id, role, content, task_id)), - ) - - result = asyncio.run( - service.handle_message( - ChatMessageRequest( - session_id="dialog-1", - project_id="rag-1", - message="Explain get_user", - files=[ChatFileContext(path="app/api/users.py", content="", content_hash="x")], - ) - ) - ) - - task = task_store.get(result.task_id) - assert task is not None - assert task.answer is not None - assert "Недостаточно опоры в коде" in task.answer - assert result.status == "done" - assert llm.calls == 0 - assert [item[1] for item in messages] == ["user", "assistant"]