Подчистил архитектуру приложения v1 работает
This commit is contained in:
Binary file not shown.
+1
-1
@@ -27,7 +27,7 @@ def create_app() -> FastAPI:
|
|||||||
allow_headers=["*"],
|
allow_headers=["*"],
|
||||||
)
|
)
|
||||||
|
|
||||||
app.include_router(modules.agent_api.public_router())
|
app.include_router(modules.api.public_router())
|
||||||
app.include_router(modules.rag.public_router())
|
app.include_router(modules.rag.public_router())
|
||||||
app.include_router(modules.rag.internal_router())
|
app.include_router(modules.rag.internal_router())
|
||||||
app.include_router(modules.rag_repo.internal_router())
|
app.include_router(modules.rag_repo.internal_router())
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from app.modules.agent.observability.module_trace import ModuleTrace
|
||||||
from app.modules.agent.llm.prompt_loader import PromptLoader
|
from app.modules.agent.llm.prompt_loader import PromptLoader
|
||||||
from app.modules.shared.gigachat.client import GigaChatClient
|
from app.modules.shared.gigachat.client import GigaChatClient
|
||||||
|
|
||||||
@@ -18,10 +19,26 @@ class AgentLlmService:
|
|||||||
self._client = client
|
self._client = client
|
||||||
self._prompts = prompts
|
self._prompts = prompts
|
||||||
|
|
||||||
def generate(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> str:
|
def build_request(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> dict[str, str]:
|
||||||
system_prompt = self._prompts.load(prompt_name)
|
system_prompt = self._prompts.load(prompt_name) or "You are a helpful assistant."
|
||||||
if not system_prompt:
|
return {
|
||||||
system_prompt = "You are a helpful assistant."
|
"prompt_name": prompt_name,
|
||||||
|
"system_prompt": system_prompt,
|
||||||
|
"user_prompt": user_input,
|
||||||
|
"log_context": log_context or "",
|
||||||
|
}
|
||||||
|
|
||||||
|
def generate(
|
||||||
|
self,
|
||||||
|
prompt_name: str,
|
||||||
|
user_input: str,
|
||||||
|
*,
|
||||||
|
log_context: str | None = None,
|
||||||
|
trace: ModuleTrace | None = None,
|
||||||
|
) -> str:
|
||||||
|
request = self.build_request(prompt_name, user_input, log_context=log_context)
|
||||||
|
if trace is not None:
|
||||||
|
trace.log("request", request)
|
||||||
if log_context:
|
if log_context:
|
||||||
LOGGER.warning(
|
LOGGER.warning(
|
||||||
"graph llm input: context=%s prompt=%s user_input=%s",
|
"graph llm input: context=%s prompt=%s user_input=%s",
|
||||||
@@ -29,7 +46,12 @@ class AgentLlmService:
|
|||||||
prompt_name,
|
prompt_name,
|
||||||
_truncate_for_log(user_input),
|
_truncate_for_log(user_input),
|
||||||
)
|
)
|
||||||
output = self._client.complete(system_prompt=system_prompt, user_prompt=user_input)
|
output = self._client.complete(
|
||||||
|
system_prompt=request["system_prompt"],
|
||||||
|
user_prompt=request["user_prompt"],
|
||||||
|
)
|
||||||
|
if trace is not None:
|
||||||
|
trace.log("response", {"text": output})
|
||||||
if log_context:
|
if log_context:
|
||||||
LOGGER.warning(
|
LOGGER.warning(
|
||||||
"graph llm output: context=%s prompt=%s output=%s",
|
"graph llm output: context=%s prompt=%s output=%s",
|
||||||
|
|||||||
@@ -0,0 +1,27 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class ModuleTraceLogger(Protocol):
|
||||||
|
def log_module(self, request_id: str, module: str, title: str, payload: dict | None = None) -> None: ...
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class ModuleTrace:
|
||||||
|
request_id: str
|
||||||
|
module: str
|
||||||
|
logger: ModuleTraceLogger
|
||||||
|
|
||||||
|
def log(self, title: str, payload: dict | None = None) -> None:
|
||||||
|
self.logger.log_module(self.request_id, self.module, title, payload or {})
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class RequestTraceContext:
|
||||||
|
request_id: str
|
||||||
|
logger: ModuleTraceLogger
|
||||||
|
|
||||||
|
def module(self, name: str) -> ModuleTrace:
|
||||||
|
return ModuleTrace(request_id=self.request_id, module=name, logger=self.logger)
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
__all__: list[str] = []
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.modules.agent.observability.module_trace import ModuleTrace
|
||||||
|
from app.modules.agent.intent_router_v2 import IntentRouterV2
|
||||||
|
|
||||||
|
|
||||||
|
class IntentRouterAdapter:
|
||||||
|
def __init__(self, router: IntentRouterV2) -> None:
|
||||||
|
self._router = router
|
||||||
|
|
||||||
|
def route(self, user_query: str, conversation_state, repo_context, trace: ModuleTrace | None = None):
|
||||||
|
if trace is not None:
|
||||||
|
trace.log("started", {"question": user_query})
|
||||||
|
result = self._router.route(user_query, conversation_state, repo_context)
|
||||||
|
if trace is not None:
|
||||||
|
trace.log(
|
||||||
|
"completed",
|
||||||
|
{
|
||||||
|
"intent": result.intent,
|
||||||
|
"sub_intent": result.query_plan.sub_intent,
|
||||||
|
"matched_intent_source": result.matched_intent_source,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return result
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from app.modules.agent.observability.module_trace import ModuleTrace
|
||||||
|
from app.modules.agent.llm.service import AgentLlmService
|
||||||
|
|
||||||
|
|
||||||
|
class LlmChatAdapter:
|
||||||
|
def __init__(self, llm: AgentLlmService, prompt_name: str = "simple_llm_answer") -> None:
|
||||||
|
self._llm = llm
|
||||||
|
self._prompt_name = prompt_name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def prompt_name(self) -> str:
|
||||||
|
return self._prompt_name
|
||||||
|
|
||||||
|
def build_request(self, message: str, request_id: str) -> dict[str, str]:
|
||||||
|
return self._llm.build_request(
|
||||||
|
self._prompt_name,
|
||||||
|
message,
|
||||||
|
log_context=f"agent:{request_id}",
|
||||||
|
)
|
||||||
|
|
||||||
|
async def generate(self, message: str, request_id: str, trace: ModuleTrace | None = None) -> str:
|
||||||
|
return await asyncio.to_thread(
|
||||||
|
self._llm.generate,
|
||||||
|
self._prompt_name,
|
||||||
|
message,
|
||||||
|
log_context=f"agent:{request_id}",
|
||||||
|
trace=trace,
|
||||||
|
)
|
||||||
+2
-2
@@ -4,8 +4,8 @@ from types import SimpleNamespace
|
|||||||
|
|
||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent.task_runtime.facade import AgentTaskRuntimeFacade
|
from app.modules.agent.task_runtime.facade import AgentTaskRuntimeFacade
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
from app.schemas.common import ModuleName
|
from app.schemas.common import ModuleName
|
||||||
|
|
||||||
|
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from app.modules.api.domain.models.agent_request import AgentRequest
|
||||||
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
|
from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
||||||
|
from app.modules.agent.observability.module_trace import RequestTraceContext
|
||||||
|
from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class ExecutionContext:
|
||||||
|
request: AgentRequest
|
||||||
|
session: AgentSession
|
||||||
|
publisher: ClientMessagePublisher
|
||||||
|
trace_logger: RequestTraceLogger
|
||||||
|
trace: RequestTraceContext
|
||||||
|
task_context: Any = None
|
||||||
|
route_result: Any = None
|
||||||
|
workflow_result: Any = None
|
||||||
+11
-9
@@ -3,14 +3,15 @@ from __future__ import annotations
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent_api.domain.models.agent_request import AgentRequest
|
from app.modules.api.domain.models.agent_request import AgentRequest
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
||||||
from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
from app.modules.agent.observability.module_trace import RequestTraceContext
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
||||||
from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
from app.modules.orchestration.processes.registry import ProcessRegistry
|
from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
||||||
from app.modules.orchestration.runtime.process_runner import ProcessRunner
|
from app.modules.agent.orchestration.processes.registry import ProcessRegistry
|
||||||
|
from app.modules.agent.orchestration.runtime.process_runner import ProcessRunner
|
||||||
from app.schemas.common import ErrorPayload, ModuleName
|
from app.schemas.common import ErrorPayload, ModuleName
|
||||||
from app.schemas.orchestration import RequestExecutionStatus
|
from app.schemas.orchestration import RequestExecutionStatus
|
||||||
|
|
||||||
@@ -43,6 +44,7 @@ class OrchestrationFacade:
|
|||||||
session=session,
|
session=session,
|
||||||
publisher=self._publisher,
|
publisher=self._publisher,
|
||||||
trace_logger=self._trace_logger,
|
trace_logger=self._trace_logger,
|
||||||
|
trace=RequestTraceContext(request_id=request.request_id, logger=self._trace_logger),
|
||||||
)
|
)
|
||||||
await self._process_runner.run(context, process.steps())
|
await self._process_runner.run(context, process.steps())
|
||||||
request.status = RequestExecutionStatus.DONE
|
request.status = RequestExecutionStatus.DONE
|
||||||
@@ -56,7 +58,7 @@ class OrchestrationFacade:
|
|||||||
request.error = ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module)
|
request.error = ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module)
|
||||||
else:
|
else:
|
||||||
request.error = ErrorPayload(
|
request.error = ErrorPayload(
|
||||||
code="agent_api_runtime_error",
|
code="api_runtime_error",
|
||||||
desc="Agent request failed unexpectedly.",
|
desc="Agent request failed unexpectedly.",
|
||||||
module=ModuleName.AGENT,
|
module=ModuleName.AGENT,
|
||||||
)
|
)
|
||||||
+4
-4
@@ -1,9 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
||||||
from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
||||||
from app.modules.orchestration.messaging.status_message_factory import StatusMessageFactory
|
from app.modules.agent.orchestration.messaging.status_message_factory import StatusMessageFactory
|
||||||
from app.modules.orchestration.messaging.user_message_factory import UserMessageFactory
|
from app.modules.agent.orchestration.messaging.user_message_factory import UserMessageFactory
|
||||||
|
|
||||||
|
|
||||||
class ClientMessagePublisher:
|
class ClientMessagePublisher:
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
from app.schemas.client_events import ClientEventType
|
from app.schemas.client_events import ClientEventType
|
||||||
|
|
||||||
|
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
from app.schemas.client_events import ClientEventType
|
from app.schemas.client_events import ClientEventType
|
||||||
|
|
||||||
|
|
||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.processes.v1.process import V1Process
|
from app.modules.agent.orchestration.processes.v1.process import V1Process
|
||||||
from app.modules.orchestration.processes.v2.process import V2Process
|
from app.modules.agent.orchestration.processes.v2.process import V2Process
|
||||||
|
|
||||||
|
|
||||||
class ProcessRegistry:
|
class ProcessRegistry:
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
|
class V1PromptPayloadBuilder:
|
||||||
|
def build(self, context: ExecutionContext) -> str:
|
||||||
|
payload = {"question": context.request.message}
|
||||||
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
prompts:
|
||||||
|
simple_llm_answer: |
|
||||||
|
Ты полезный AI-ассистент проекта.
|
||||||
|
|
||||||
|
На вход приходит JSON с полем:
|
||||||
|
- question
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Отвечай как персонаж мемов из дагестана
|
||||||
|
- Если вопрос неясный, аккуратно укажи, чего не хватает
|
||||||
|
- Не выдумывай несуществующие факты о проекте
|
||||||
|
- Формулируй ответ как обычное сообщение пользователю
|
||||||
@@ -0,0 +1,42 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter
|
||||||
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
from app.modules.agent.orchestration.processes.v1.prompt_payload_builder import V1PromptPayloadBuilder
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleLlmWorkflow:
|
||||||
|
workflow_id = "simple_llm"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
llm: LlmChatAdapter,
|
||||||
|
prompt_payload_builder: V1PromptPayloadBuilder | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._llm = llm
|
||||||
|
self._payload_builder = prompt_payload_builder or V1PromptPayloadBuilder()
|
||||||
|
|
||||||
|
async def run(self, context: ExecutionContext) -> dict:
|
||||||
|
workflow_trace = context.trace.module("task_workflow")
|
||||||
|
workflow_trace.log("started", {"workflow_id": self.workflow_id})
|
||||||
|
prompt_payload = self._payload_builder.build(context)
|
||||||
|
answer = await self._llm.generate(
|
||||||
|
prompt_payload,
|
||||||
|
context.request.request_id,
|
||||||
|
trace=context.trace.module("llm"),
|
||||||
|
)
|
||||||
|
result = {
|
||||||
|
"workflow_id": self.workflow_id,
|
||||||
|
"prompt_name": self._llm.prompt_name,
|
||||||
|
"prompt_payload": prompt_payload,
|
||||||
|
"answer": answer,
|
||||||
|
}
|
||||||
|
workflow_trace.log(
|
||||||
|
"completed",
|
||||||
|
{
|
||||||
|
"workflow_id": self.workflow_id,
|
||||||
|
"prompt_name": self._llm.prompt_name,
|
||||||
|
"answer_length": len(answer),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return result
|
||||||
+6
-3
@@ -1,11 +1,14 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class BootstrapStep:
|
class BootstrapStep:
|
||||||
async def run(self, context: ExecutionContext) -> None:
|
async def run(self, context: ExecutionContext) -> None:
|
||||||
context.trace_logger.log_step(context.request.request_id, "bootstrap", "started")
|
context.trace.module("orchestrator").log(
|
||||||
|
"bootstrap",
|
||||||
|
{"status": "started", "process_version": context.request.process_version},
|
||||||
|
)
|
||||||
await context.publisher.publish_status(
|
await context.publisher.publish_status(
|
||||||
context.request.request_id,
|
context.request.request_id,
|
||||||
"orchestrator",
|
"orchestrator",
|
||||||
@@ -17,4 +20,4 @@ class BootstrapStep:
|
|||||||
"Запускаю процесс обработки v1.",
|
"Запускаю процесс обработки v1.",
|
||||||
{"process_version": context.request.process_version},
|
{"process_version": context.request.process_version},
|
||||||
)
|
)
|
||||||
context.trace_logger.log_step(context.request.request_id, "bootstrap", "completed")
|
context.trace.module("orchestrator").log("bootstrap", {"status": "completed"})
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
from app.modules.agent.orchestration.processes.v1.simple_llm_workflow import SimpleLlmWorkflow
|
||||||
|
|
||||||
|
|
||||||
|
class ExecuteLlmWorkflowStep:
|
||||||
|
def __init__(self, workflow: SimpleLlmWorkflow) -> None:
|
||||||
|
self._workflow = workflow
|
||||||
|
|
||||||
|
async def run(self, context: ExecutionContext) -> None:
|
||||||
|
request = context.request
|
||||||
|
await context.publisher.publish_status(
|
||||||
|
request.request_id,
|
||||||
|
"task_workflow",
|
||||||
|
f"Запускаю workflow {self._workflow.workflow_id}.",
|
||||||
|
)
|
||||||
|
await context.publisher.publish_status(
|
||||||
|
request.request_id,
|
||||||
|
"prompt_builder",
|
||||||
|
"Формирую prompt payload для LLM.",
|
||||||
|
)
|
||||||
|
result = await self._workflow.run(context)
|
||||||
|
request.answer = str(result.get("answer") or "")
|
||||||
|
context.workflow_result = result
|
||||||
|
await context.publisher.publish_status(
|
||||||
|
request.request_id,
|
||||||
|
"llm_process",
|
||||||
|
"Ответ от LLM получен.",
|
||||||
|
{
|
||||||
|
"workflow_id": result.get("workflow_id"),
|
||||||
|
"prompt_name": result.get("prompt_name"),
|
||||||
|
"answer_length": len(request.answer),
|
||||||
|
},
|
||||||
|
)
|
||||||
+3
-3
@@ -1,12 +1,12 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class FinalizeStep:
|
class FinalizeStep:
|
||||||
async def run(self, context: ExecutionContext) -> None:
|
async def run(self, context: ExecutionContext) -> None:
|
||||||
request = context.request
|
request = context.request
|
||||||
context.trace_logger.log_step(request.request_id, "finalize", "started")
|
context.trace.module("orchestrator").log("finalize", {"status": "started"})
|
||||||
await context.publisher.publish_user(
|
await context.publisher.publish_user(
|
||||||
request.request_id,
|
request.request_id,
|
||||||
"agent",
|
"agent",
|
||||||
@@ -17,4 +17,4 @@ class FinalizeStep:
|
|||||||
"orchestrator",
|
"orchestrator",
|
||||||
"Обработка запроса завершена.",
|
"Обработка запроса завершена.",
|
||||||
)
|
)
|
||||||
context.trace_logger.log_step(request.request_id, "finalize", "completed")
|
context.trace.module("orchestrator").log("finalize", {"status": "completed"})
|
||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.adapters.llm_chat_adapter import LlmChatAdapter
|
from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class RunLlmStep:
|
class RunLlmStep:
|
||||||
+1
-1
@@ -163,7 +163,7 @@ Typical sequence:
|
|||||||
## Trace Logging
|
## Trace Logging
|
||||||
|
|
||||||
Per-request trace files are written by:
|
Per-request trace files are written by:
|
||||||
[request_trace_logger.py](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/src/app/modules/agent_api/infrastructure/logging/request_trace_logger.py)
|
[request_trace_logger.py](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/src/app/modules/api/infrastructure/logging/request_trace_logger.py)
|
||||||
|
|
||||||
Location:
|
Location:
|
||||||
[runtime_traces/agent_requests](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/runtime_traces/agent_requests)
|
[runtime_traces/agent_requests](/Users/alex/Dev_projects_v2/ai driven app process/v2/agent/runtime_traces/agent_requests)
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
from app.modules.agent.runtime.docs_qa_pipeline.models import DocsEvidenceBundle, OpenAPIResult
|
||||||
|
|
||||||
|
|
||||||
|
class V2PromptPayloadBuilder:
|
||||||
|
def build(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
question: str,
|
||||||
|
intent: str,
|
||||||
|
sub_intent: str,
|
||||||
|
evidence_bundle: DocsEvidenceBundle,
|
||||||
|
api_contract: OpenAPIResult | None = None,
|
||||||
|
) -> str:
|
||||||
|
payload = {
|
||||||
|
"question": question,
|
||||||
|
"documents": list(evidence_bundle.documents),
|
||||||
|
"facts": list(evidence_bundle.facts),
|
||||||
|
"entities": list(evidence_bundle.entities),
|
||||||
|
"workflows": list(evidence_bundle.workflows),
|
||||||
|
"relations": list(evidence_bundle.relations),
|
||||||
|
"chunks": list(evidence_bundle.chunks),
|
||||||
|
}
|
||||||
|
if api_contract is not None:
|
||||||
|
payload["api_contract"] = {
|
||||||
|
"path": api_contract.path,
|
||||||
|
"method": api_contract.method,
|
||||||
|
"request_schema": api_contract.request_schema,
|
||||||
|
"response_schema": api_contract.response_schema,
|
||||||
|
"diagnostics": dict(api_contract.diagnostics),
|
||||||
|
}
|
||||||
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
"""Выбор prompt для process-local workflow v2."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
|
||||||
|
class V2PromptSelector:
|
||||||
|
_DOCS_INTENT_PROMPTS = {
|
||||||
|
"DOCUMENTATION_EXPLAIN": "documentation_explain_answer",
|
||||||
|
"GENERAL_QA": "general_qa_answer",
|
||||||
|
}
|
||||||
|
|
||||||
|
def select(self, *, intent: str = "DOCUMENTATION_EXPLAIN", sub_intent: str, answer_mode: str) -> str:
|
||||||
|
intent_key = (intent or "DOCUMENTATION_EXPLAIN").upper()
|
||||||
|
if intent_key in {"OPENAPI_GENERATION", "OPENAPI_FROM_DOCUMENTATION"}:
|
||||||
|
if sub_intent.upper() == "OPENAPI_FRAGMENT_GENERATE":
|
||||||
|
return "openapi_fragment_answer"
|
||||||
|
return "openapi_answer"
|
||||||
|
return self._DOCS_INTENT_PROMPTS.get(intent_key, "documentation_explain_answer")
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
prompts:
|
||||||
|
documentation_explain_answer: |
|
||||||
|
Ты объясняешь документацию системы.
|
||||||
|
|
||||||
|
На вход приходит JSON с полями:
|
||||||
|
- question
|
||||||
|
- documents
|
||||||
|
- facts
|
||||||
|
- entities
|
||||||
|
- workflows
|
||||||
|
- relations
|
||||||
|
- chunks
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Используй только предоставленные факты
|
||||||
|
- Не додумывай
|
||||||
|
- Если данных недостаточно, скажи это явно
|
||||||
|
- Объясняй структурировано
|
||||||
|
|
||||||
|
Формат ответа:
|
||||||
|
1. Краткое описание
|
||||||
|
2. Основные элементы
|
||||||
|
3. Как это работает
|
||||||
|
4. Связи с другими частями системы (если есть)
|
||||||
|
general_qa_answer: |
|
||||||
|
Ты отвечаешь на общий вопрос по документации проекта.
|
||||||
|
|
||||||
|
На вход приходит JSON с полями:
|
||||||
|
- question
|
||||||
|
- documents
|
||||||
|
- facts
|
||||||
|
- entities
|
||||||
|
- workflows
|
||||||
|
- relations
|
||||||
|
- chunks
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Используй только предоставленные документы и факты
|
||||||
|
- Не додумывай отсутствующие детали
|
||||||
|
- Если данных недостаточно, скажи это прямо
|
||||||
|
- Дай короткий понятный ответ без лишней структуры
|
||||||
|
openapi_answer: |
|
||||||
|
Ты генерируешь OpenAPI спецификацию по документации API.
|
||||||
|
|
||||||
|
На вход приходит JSON с полями:
|
||||||
|
- question
|
||||||
|
- documents
|
||||||
|
- facts
|
||||||
|
- entities
|
||||||
|
- workflows
|
||||||
|
- relations
|
||||||
|
- chunks
|
||||||
|
- api_contract
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Используй только данные из документации
|
||||||
|
- Не придумывай поля
|
||||||
|
- Если данных нет, не заполняй
|
||||||
|
- Верни ТОЛЬКО YAML без пояснений
|
||||||
|
|
||||||
|
Формат:
|
||||||
|
paths:
|
||||||
|
/path:
|
||||||
|
method:
|
||||||
|
summary: ...
|
||||||
|
requestBody:
|
||||||
|
responses:
|
||||||
|
openapi_fragment_answer: |
|
||||||
|
Ты генерируешь часть OpenAPI schema по документации API.
|
||||||
|
|
||||||
|
На вход приходит JSON с полями:
|
||||||
|
- question
|
||||||
|
- documents
|
||||||
|
- facts
|
||||||
|
- entities
|
||||||
|
- workflows
|
||||||
|
- relations
|
||||||
|
- chunks
|
||||||
|
- api_contract
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Используй только данные из документации
|
||||||
|
- Не придумывай отсутствующие поля
|
||||||
|
- Верни только содержимое нужного фрагмента
|
||||||
|
fallback_answer: |
|
||||||
|
Ты формируешь безопасный fallback-ответ.
|
||||||
|
|
||||||
|
На вход приходит JSON с полями:
|
||||||
|
- question
|
||||||
|
- attachments
|
||||||
|
- confluence_urls
|
||||||
|
|
||||||
|
Правила:
|
||||||
|
- Если специализированный workflow не выбран, честно скажи об ограничении
|
||||||
|
- Используй только данные из payload
|
||||||
|
- Не выдумывай детали проекта
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
||||||
|
|
||||||
|
|
||||||
class ExecuteDocumentationWorkflowStep(WorkflowStepBase):
|
class ExecuteDocumentationWorkflowStep(WorkflowStepBase):
|
||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
||||||
|
|
||||||
|
|
||||||
class ExecuteFallbackWorkflowStep(WorkflowStepBase):
|
class ExecuteFallbackWorkflowStep(WorkflowStepBase):
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
||||||
|
|
||||||
|
|
||||||
class ExecuteGeneralQaWorkflowStep(WorkflowStepBase):
|
class ExecuteGeneralQaWorkflowStep(WorkflowStepBase):
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
from app.modules.agent.orchestration.processes.v2.steps.workflow_step_base import WorkflowStepBase
|
||||||
|
|
||||||
|
|
||||||
class ExecuteOpenApiWorkflowStep(WorkflowStepBase):
|
class ExecuteOpenApiWorkflowStep(WorkflowStepBase):
|
||||||
+5
-14
@@ -5,9 +5,9 @@ import asyncio
|
|||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent.task_runtime.context import TaskRuntimeContextBuilder
|
from app.modules.agent.task_runtime.context import TaskRuntimeContextBuilder
|
||||||
from app.modules.agent.task_runtime.enrichment import ContextEnrichmentService
|
from app.modules.agent.task_runtime.enrichment import ContextEnrichmentService
|
||||||
from app.modules.orchestration.adapters.intent_router_adapter import IntentRouterAdapter
|
from app.modules.agent.orchestration.adapters.intent_router_adapter import IntentRouterAdapter
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
from app.modules.orchestration.v2_progress import build_progress_callback
|
from app.modules.agent.orchestration.v2_progress import build_progress_callback
|
||||||
from app.schemas.common import ModuleName
|
from app.schemas.common import ModuleName
|
||||||
|
|
||||||
|
|
||||||
@@ -41,10 +41,10 @@ class RouteIntentStep:
|
|||||||
attachments=[],
|
attachments=[],
|
||||||
files=[],
|
files=[],
|
||||||
progress_cb=build_progress_callback(loop, context.publisher, request.request_id),
|
progress_cb=build_progress_callback(loop, context.publisher, request.request_id),
|
||||||
|
trace=context.trace,
|
||||||
)
|
)
|
||||||
task_context.enriched_context = self._enrichment.enrich(task_context)
|
task_context.enriched_context = self._enrichment.enrich(task_context)
|
||||||
context.task_context = task_context
|
context.task_context = task_context
|
||||||
context.trace_logger.log_step(request.request_id, "intent_router", "started")
|
|
||||||
await context.publisher.publish_status(
|
await context.publisher.publish_status(
|
||||||
request.request_id,
|
request.request_id,
|
||||||
"intent_router",
|
"intent_router",
|
||||||
@@ -55,6 +55,7 @@ class RouteIntentStep:
|
|||||||
request.message,
|
request.message,
|
||||||
task_context.conversation_state,
|
task_context.conversation_state,
|
||||||
task_context.repo_context,
|
task_context.repo_context,
|
||||||
|
context.trace.module("intent_router"),
|
||||||
)
|
)
|
||||||
task_context.route_result = route_result
|
task_context.route_result = route_result
|
||||||
context.route_result = route_result
|
context.route_result = route_result
|
||||||
@@ -68,13 +69,3 @@ class RouteIntentStep:
|
|||||||
"matched_intent_source": route_result.matched_intent_source,
|
"matched_intent_source": route_result.matched_intent_source,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
context.trace_logger.log_step(
|
|
||||||
request.request_id,
|
|
||||||
"intent_router",
|
|
||||||
"completed",
|
|
||||||
{
|
|
||||||
"intent": route_result.intent,
|
|
||||||
"sub_intent": route_result.query_plan.sub_intent,
|
|
||||||
"matched_intent_source": route_result.matched_intent_source,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.adapters.task_runtime_adapter import TaskRuntimeAdapter
|
from app.modules.agent.orchestration.adapters.task_runtime_adapter import TaskRuntimeAdapter
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class RunTaskWorkflowStep:
|
class RunTaskWorkflowStep:
|
||||||
+1
-8
@@ -3,7 +3,7 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class WorkflowStepBase:
|
class WorkflowStepBase:
|
||||||
@@ -20,7 +20,6 @@ class WorkflowStepBase:
|
|||||||
async def run(self, context: ExecutionContext) -> None:
|
async def run(self, context: ExecutionContext) -> None:
|
||||||
request = context.request
|
request = context.request
|
||||||
task_context = context.task_context
|
task_context = context.task_context
|
||||||
context.trace_logger.log_step(request.request_id, self._step_name, "started")
|
|
||||||
await context.publisher.publish_status(
|
await context.publisher.publish_status(
|
||||||
request.request_id,
|
request.request_id,
|
||||||
"task_workflow",
|
"task_workflow",
|
||||||
@@ -32,12 +31,6 @@ class WorkflowStepBase:
|
|||||||
request.answer = result.answer or ""
|
request.answer = result.answer or ""
|
||||||
diagnostics = dict(result.meta.get("diagnostics") or {})
|
diagnostics = dict(result.meta.get("diagnostics") or {})
|
||||||
await self._publish_diagnostics(context, diagnostics, result)
|
await self._publish_diagnostics(context, diagnostics, result)
|
||||||
context.trace_logger.log_step(
|
|
||||||
request.request_id,
|
|
||||||
self._step_name,
|
|
||||||
"completed",
|
|
||||||
{"workflow_id": self._workflow.workflow_id, "meta": result.meta},
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _publish_diagnostics(self, context: ExecutionContext, diagnostics: dict[str, Any], result: Any) -> None:
|
async def _publish_diagnostics(self, context: ExecutionContext, diagnostics: dict[str, Any], result: Any) -> None:
|
||||||
request_id = context.request.request_id
|
request_id = context.request.request_id
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.modules.orchestration.context.execution_context import ExecutionContext
|
from app.modules.agent.orchestration.context.execution_context import ExecutionContext
|
||||||
|
|
||||||
|
|
||||||
class ProcessRunner:
|
class ProcessRunner:
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
||||||
|
|
||||||
|
|
||||||
def build_progress_callback(loop: asyncio.AbstractEventLoop, publisher: ClientMessagePublisher, request_id: str):
|
def build_progress_callback(loop: asyncio.AbstractEventLoop, publisher: ClientMessagePublisher, request_id: str):
|
||||||
@@ -1,11 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import math
|
|
||||||
from time import perf_counter
|
from time import perf_counter
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from app.modules.agent.llm import AgentLlmService
|
from app.modules.agent.llm import AgentLlmService
|
||||||
from app.modules.agent.llm.prompt_loader import PromptLoader
|
|
||||||
from app.modules.agent.intent_router_v2.analysis.docs_query_signals import DocsQuerySignals
|
from app.modules.agent.intent_router_v2.analysis.docs_query_signals import DocsQuerySignals
|
||||||
from app.modules.agent.runtime.docs_qa_pipeline.answer_synthesizer import DocsAnswerSynthesizer
|
from app.modules.agent.runtime.docs_qa_pipeline.answer_synthesizer import DocsAnswerSynthesizer
|
||||||
from app.modules.agent.runtime.docs_qa_pipeline.anchor_selector import DocsAnchorSelector
|
from app.modules.agent.runtime.docs_qa_pipeline.anchor_selector import DocsAnchorSelector
|
||||||
@@ -19,6 +17,7 @@ from app.modules.agent.runtime.docs_qa_pipeline.prompt_payload_builder import Do
|
|||||||
from app.modules.agent.runtime.legacy_pipeline import RetrievalAdapter
|
from app.modules.agent.runtime.legacy_pipeline import RetrievalAdapter
|
||||||
from app.modules.agent.runtime.steps.context import build_retrieval_request
|
from app.modules.agent.runtime.steps.context import build_retrieval_request
|
||||||
from app.modules.agent.runtime.steps.generation import RuntimePromptSelector
|
from app.modules.agent.runtime.steps.generation import RuntimePromptSelector
|
||||||
|
from app.modules.agent.observability.module_trace import RequestTraceContext
|
||||||
|
|
||||||
|
|
||||||
class DocsQAPipelineRunner:
|
class DocsQAPipelineRunner:
|
||||||
@@ -60,6 +59,7 @@ class DocsQAPipelineRunner:
|
|||||||
*,
|
*,
|
||||||
conversation_state: Any = None,
|
conversation_state: Any = None,
|
||||||
mode: str = "full",
|
mode: str = "full",
|
||||||
|
trace: RequestTraceContext | None = None,
|
||||||
) -> DocsQAPipelineResult:
|
) -> DocsQAPipelineResult:
|
||||||
timings: dict[str, int] = {}
|
timings: dict[str, int] = {}
|
||||||
t0 = perf_counter()
|
t0 = perf_counter()
|
||||||
@@ -88,6 +88,16 @@ class DocsQAPipelineRunner:
|
|||||||
)
|
)
|
||||||
if request.sub_intent == "RELATED_DOCS_EXPLAIN" and not raw_rows and self._has_relation_hits(unfiltered_rows):
|
if request.sub_intent == "RELATED_DOCS_EXPLAIN" and not raw_rows and self._has_relation_hits(unfiltered_rows):
|
||||||
raw_rows = unfiltered_rows
|
raw_rows = unfiltered_rows
|
||||||
|
if trace is not None:
|
||||||
|
trace.module("rag_retrieval").log(
|
||||||
|
"completed",
|
||||||
|
{
|
||||||
|
"planned_layers": list(request.requested_layers),
|
||||||
|
"executed_layers": list(retrieval_report.get("executed_layers") or request.requested_layers),
|
||||||
|
"layer_diagnostics": dict(retrieval_report.get("layer_diagnostics") or {}),
|
||||||
|
"rows": len(raw_rows),
|
||||||
|
},
|
||||||
|
)
|
||||||
timings["retrieval"] = _ms(t1)
|
timings["retrieval"] = _ms(t1)
|
||||||
|
|
||||||
t2 = perf_counter()
|
t2 = perf_counter()
|
||||||
@@ -140,7 +150,14 @@ class DocsQAPipelineRunner:
|
|||||||
)
|
)
|
||||||
answer = openapi_result.raw_yaml if answer_mode != "degraded" else "Недостаточно contract evidence для OpenAPI."
|
answer = openapi_result.raw_yaml if answer_mode != "degraded" else "Недостаточно contract evidence для OpenAPI."
|
||||||
else:
|
else:
|
||||||
answer = self._generate_openapi_answer(user_query, router_result.intent, request.sub_intent, evidence_bundle, openapi_result)
|
answer = self._generate_openapi_answer(
|
||||||
|
user_query,
|
||||||
|
router_result.intent,
|
||||||
|
request.sub_intent,
|
||||||
|
evidence_bundle,
|
||||||
|
openapi_result,
|
||||||
|
trace=trace,
|
||||||
|
)
|
||||||
output_valid, llm_details = self._openapi_postprocessor.validate(
|
output_valid, llm_details = self._openapi_postprocessor.validate(
|
||||||
answer,
|
answer,
|
||||||
require_paths=request.sub_intent != "OPENAPI_FRAGMENT_GENERATE",
|
require_paths=request.sub_intent != "OPENAPI_FRAGMENT_GENERATE",
|
||||||
@@ -176,7 +193,13 @@ class DocsQAPipelineRunner:
|
|||||||
)
|
)
|
||||||
output_valid = answer_mode != "degraded"
|
output_valid = answer_mode != "degraded"
|
||||||
else:
|
else:
|
||||||
answer = self._generate_docs_answer(user_query, router_result.intent, request.sub_intent, evidence_bundle)
|
answer = self._generate_docs_answer(
|
||||||
|
user_query,
|
||||||
|
router_result.intent,
|
||||||
|
request.sub_intent,
|
||||||
|
evidence_bundle,
|
||||||
|
trace=trace,
|
||||||
|
)
|
||||||
answer_mode, degraded_reason, answer = self._finalize_docs_answer(
|
answer_mode, degraded_reason, answer = self._finalize_docs_answer(
|
||||||
answer=answer,
|
answer=answer,
|
||||||
raw_rows=raw_rows,
|
raw_rows=raw_rows,
|
||||||
@@ -197,6 +220,16 @@ class DocsQAPipelineRunner:
|
|||||||
openapi_result=openapi_result,
|
openapi_result=openapi_result,
|
||||||
router_result=router_result,
|
router_result=router_result,
|
||||||
)
|
)
|
||||||
|
if trace is not None:
|
||||||
|
trace.module("evidence_gate").log(
|
||||||
|
"evaluated",
|
||||||
|
{
|
||||||
|
"decision": gate_decision,
|
||||||
|
"reason": gate_decision_reason,
|
||||||
|
"missing": gate_missing_requirements,
|
||||||
|
"satisfied": gate_satisfied_requirements,
|
||||||
|
},
|
||||||
|
)
|
||||||
diagnostics = self._diagnostics_builder.build(
|
diagnostics = self._diagnostics_builder.build(
|
||||||
intent=router_result.intent,
|
intent=router_result.intent,
|
||||||
sub_intent=request.sub_intent,
|
sub_intent=request.sub_intent,
|
||||||
@@ -255,7 +288,7 @@ class DocsQAPipelineRunner:
|
|||||||
mode=mode,
|
mode=mode,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _generate_docs_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle) -> str:
|
def _generate_docs_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, trace=None) -> str:
|
||||||
if self._llm is None:
|
if self._llm is None:
|
||||||
return self._answer_synthesizer.synthesize(question, evidence_bundle)
|
return self._answer_synthesizer.synthesize(question, evidence_bundle)
|
||||||
payload = self._prompt_payload_builder.build(
|
payload = self._prompt_payload_builder.build(
|
||||||
@@ -265,9 +298,14 @@ class DocsQAPipelineRunner:
|
|||||||
evidence_bundle=evidence_bundle,
|
evidence_bundle=evidence_bundle,
|
||||||
)
|
)
|
||||||
prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal")
|
prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal")
|
||||||
return self._llm.generate(prompt_name, payload, log_context="graph.project_qa.docs.answer").strip()
|
return self._llm.generate(
|
||||||
|
prompt_name,
|
||||||
|
payload,
|
||||||
|
log_context="graph.project_qa.docs.answer",
|
||||||
|
trace=trace.module("llm") if trace is not None else None,
|
||||||
|
).strip()
|
||||||
|
|
||||||
def _generate_openapi_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, api_contract) -> str:
|
def _generate_openapi_answer(self, question: str, intent: str, sub_intent: str, evidence_bundle, api_contract, trace=None) -> str:
|
||||||
if self._llm is None:
|
if self._llm is None:
|
||||||
return api_contract.raw_yaml
|
return api_contract.raw_yaml
|
||||||
payload = self._prompt_payload_builder.build(
|
payload = self._prompt_payload_builder.build(
|
||||||
@@ -278,7 +316,12 @@ class DocsQAPipelineRunner:
|
|||||||
api_contract=api_contract,
|
api_contract=api_contract,
|
||||||
)
|
)
|
||||||
prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal")
|
prompt_name = self._prompt_selector.select(intent=intent, sub_intent=sub_intent, answer_mode="normal")
|
||||||
return self._llm.generate(prompt_name, payload, log_context="graph.project_qa.docs.openapi").strip()
|
return self._llm.generate(
|
||||||
|
prompt_name,
|
||||||
|
payload,
|
||||||
|
log_context="graph.project_qa.docs.openapi",
|
||||||
|
trace=trace.module("llm") if trace is not None else None,
|
||||||
|
).strip()
|
||||||
|
|
||||||
def _llm_mode(self, intent: str, sub_intent: str) -> str:
|
def _llm_mode(self, intent: str, sub_intent: str) -> str:
|
||||||
if sub_intent == "RELATED_DOCS_EXPLAIN":
|
if sub_intent == "RELATED_DOCS_EXPLAIN":
|
||||||
@@ -307,19 +350,14 @@ class DocsQAPipelineRunner:
|
|||||||
evidence_bundle=evidence_bundle,
|
evidence_bundle=evidence_bundle,
|
||||||
api_contract=api_contract,
|
api_contract=api_contract,
|
||||||
)
|
)
|
||||||
system_prompt = PromptLoader().load(prompt_name) or "You are a helpful assistant."
|
if self._llm is None:
|
||||||
tokens_in_estimate = max(1, int(math.ceil((len(system_prompt) + len(user_prompt)) / 4)))
|
|
||||||
return {
|
return {
|
||||||
"prompt_name": prompt_name,
|
"prompt_name": prompt_name,
|
||||||
"system_prompt": system_prompt,
|
"system_prompt": "You are a helpful assistant.",
|
||||||
"user_prompt": user_prompt,
|
"user_prompt": user_prompt,
|
||||||
"log_context": log_context,
|
"log_context": log_context,
|
||||||
"prompt_stats": {
|
|
||||||
"system_chars": len(system_prompt),
|
|
||||||
"user_chars": len(user_prompt),
|
|
||||||
"tokens_in_estimate": tokens_in_estimate,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
return self._llm.build_request(prompt_name, user_prompt, log_context=log_context)
|
||||||
|
|
||||||
def _finalize_docs_answer(
|
def _finalize_docs_answer(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -17,8 +17,6 @@ class DocsPromptPayloadBuilder:
|
|||||||
) -> str:
|
) -> str:
|
||||||
payload = {
|
payload = {
|
||||||
"question": question,
|
"question": question,
|
||||||
"intent": intent,
|
|
||||||
"sub_intent": sub_intent,
|
|
||||||
"documents": list(evidence_bundle.documents),
|
"documents": list(evidence_bundle.documents),
|
||||||
"facts": list(evidence_bundle.facts),
|
"facts": list(evidence_bundle.facts),
|
||||||
"entities": list(evidence_bundle.entities),
|
"entities": list(evidence_bundle.entities),
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ class TaskRuntimeContextBuilder:
|
|||||||
attachments: list[dict],
|
attachments: list[dict],
|
||||||
files: list[dict],
|
files: list[dict],
|
||||||
progress_cb,
|
progress_cb,
|
||||||
|
trace=None,
|
||||||
) -> TaskRuntimeContext:
|
) -> TaskRuntimeContext:
|
||||||
files_map = self._files_to_map(files)
|
files_map = self._files_to_map(files)
|
||||||
return TaskRuntimeContext(
|
return TaskRuntimeContext(
|
||||||
@@ -33,6 +34,7 @@ class TaskRuntimeContextBuilder:
|
|||||||
files=list(files or []),
|
files=list(files or []),
|
||||||
files_map=files_map,
|
files_map=files_map,
|
||||||
progress_cb=progress_cb,
|
progress_cb=progress_cb,
|
||||||
|
trace=trace,
|
||||||
repo_context=self._repo_context_factory.build(files_map),
|
repo_context=self._repo_context_factory.build(files_map),
|
||||||
conversation_state=ConversationState(),
|
conversation_state=ConversationState(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ class TaskRuntimeContext:
|
|||||||
files: list[dict[str, Any]] = field(default_factory=list)
|
files: list[dict[str, Any]] = field(default_factory=list)
|
||||||
files_map: dict[str, dict[str, Any]] = field(default_factory=dict)
|
files_map: dict[str, dict[str, Any]] = field(default_factory=dict)
|
||||||
progress_cb: ProgressCallback | None = None
|
progress_cb: ProgressCallback | None = None
|
||||||
|
trace: Any = None
|
||||||
repo_context: Any = None
|
repo_context: Any = None
|
||||||
conversation_state: Any = None
|
conversation_state: Any = None
|
||||||
route_result: Any = None
|
route_result: Any = None
|
||||||
|
|||||||
@@ -13,11 +13,17 @@ class DocsQaWorkflow:
|
|||||||
self._runner = runner
|
self._runner = runner
|
||||||
|
|
||||||
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"started",
|
||||||
|
{"workflow_id": self.workflow_id, "message": ctx.message},
|
||||||
|
)
|
||||||
result = self._runner.run(
|
result = self._runner.run(
|
||||||
ctx.message,
|
ctx.message,
|
||||||
ctx.rag_session_id,
|
ctx.rag_session_id,
|
||||||
conversation_state=ctx.conversation_state,
|
conversation_state=ctx.conversation_state,
|
||||||
mode="full",
|
mode="full",
|
||||||
|
trace=ctx.trace,
|
||||||
)
|
)
|
||||||
diagnostics = result.diagnostics.model_dump(mode="json")
|
diagnostics = result.diagnostics.model_dump(mode="json")
|
||||||
emit_status_block(
|
emit_status_block(
|
||||||
@@ -42,15 +48,22 @@ class DocsQaWorkflow:
|
|||||||
title="Evidence Gate",
|
title="Evidence Gate",
|
||||||
lines=_gate_lines(diagnostics),
|
lines=_gate_lines(diagnostics),
|
||||||
)
|
)
|
||||||
return WorkflowExecutionResult(
|
result_payload = WorkflowExecutionResult(
|
||||||
result_type=TaskResultType.ANSWER,
|
result_type=TaskResultType.ANSWER,
|
||||||
answer=result.answer,
|
answer=result.answer,
|
||||||
meta={
|
meta={
|
||||||
"workflow_id": self.workflow_id,
|
"workflow_id": self.workflow_id,
|
||||||
"intent": result.router_result.intent,
|
"intent": result.router_result.intent,
|
||||||
"diagnostics": diagnostics,
|
"diagnostics": diagnostics,
|
||||||
|
"llm_request": result.llm_request,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"completed",
|
||||||
|
{"workflow_id": self.workflow_id, "answer_length": len(result.answer or "")},
|
||||||
|
)
|
||||||
|
return result_payload
|
||||||
|
|
||||||
|
|
||||||
def _retrieval_lines(diagnostics: dict) -> list[str]:
|
def _retrieval_lines(diagnostics: dict) -> list[str]:
|
||||||
|
|||||||
@@ -15,6 +15,11 @@ class FallbackWorkflow:
|
|||||||
self._llm = llm
|
self._llm = llm
|
||||||
|
|
||||||
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"started",
|
||||||
|
{"workflow_id": self.workflow_id, "message": ctx.message},
|
||||||
|
)
|
||||||
emit_status_block(
|
emit_status_block(
|
||||||
ctx,
|
ctx,
|
||||||
block_id="rag_retrieval",
|
block_id="rag_retrieval",
|
||||||
@@ -27,14 +32,18 @@ class FallbackWorkflow:
|
|||||||
payload = json.dumps(
|
payload = json.dumps(
|
||||||
{
|
{
|
||||||
"question": ctx.message,
|
"question": ctx.message,
|
||||||
"intent": getattr(ctx.route_result, "intent", ""),
|
|
||||||
"attachments": list(ctx.attachments),
|
"attachments": list(ctx.attachments),
|
||||||
"confluence_urls": list(ctx.enriched_context.get("confluence_urls") or []),
|
"confluence_urls": list(ctx.enriched_context.get("confluence_urls") or []),
|
||||||
},
|
},
|
||||||
ensure_ascii=False,
|
ensure_ascii=False,
|
||||||
indent=2,
|
indent=2,
|
||||||
)
|
)
|
||||||
answer = self._llm.generate("docs_fallback_answer", payload, log_context="agent.workflow.fallback").strip()
|
answer = self._llm.generate(
|
||||||
|
"fallback_answer",
|
||||||
|
payload,
|
||||||
|
log_context="agent.workflow.fallback",
|
||||||
|
trace=ctx.trace.module("llm") if ctx.trace is not None else None,
|
||||||
|
).strip()
|
||||||
emit_status_block(
|
emit_status_block(
|
||||||
ctx,
|
ctx,
|
||||||
block_id="workflow",
|
block_id="workflow",
|
||||||
@@ -47,8 +56,14 @@ class FallbackWorkflow:
|
|||||||
title="Evidence Gate",
|
title="Evidence Gate",
|
||||||
lines=["not applied in fallback workflow"],
|
lines=["not applied in fallback workflow"],
|
||||||
)
|
)
|
||||||
return WorkflowExecutionResult(
|
result = WorkflowExecutionResult(
|
||||||
result_type=TaskResultType.ANSWER,
|
result_type=TaskResultType.ANSWER,
|
||||||
answer=answer,
|
answer=answer,
|
||||||
meta={"workflow_id": self.workflow_id, "intent": getattr(ctx.route_result, "intent", "")},
|
meta={"workflow_id": self.workflow_id, "intent": getattr(ctx.route_result, "intent", "")},
|
||||||
)
|
)
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"completed",
|
||||||
|
{"workflow_id": self.workflow_id, "answer_length": len(answer)},
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|||||||
@@ -14,11 +14,17 @@ class GeneralQaWorkflow:
|
|||||||
self._runner = runner
|
self._runner = runner
|
||||||
|
|
||||||
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"started",
|
||||||
|
{"workflow_id": self.workflow_id, "message": ctx.message},
|
||||||
|
)
|
||||||
result = self._runner.run(
|
result = self._runner.run(
|
||||||
ctx.message,
|
ctx.message,
|
||||||
ctx.rag_session_id,
|
ctx.rag_session_id,
|
||||||
conversation_state=ctx.conversation_state,
|
conversation_state=ctx.conversation_state,
|
||||||
mode="full",
|
mode="full",
|
||||||
|
trace=ctx.trace,
|
||||||
)
|
)
|
||||||
diagnostics = result.diagnostics.model_dump(mode="json")
|
diagnostics = result.diagnostics.model_dump(mode="json")
|
||||||
emit_status_block(
|
emit_status_block(
|
||||||
@@ -43,12 +49,19 @@ class GeneralQaWorkflow:
|
|||||||
title="Evidence Gate",
|
title="Evidence Gate",
|
||||||
lines=_gate_lines(diagnostics),
|
lines=_gate_lines(diagnostics),
|
||||||
)
|
)
|
||||||
return WorkflowExecutionResult(
|
result_payload = WorkflowExecutionResult(
|
||||||
result_type=TaskResultType.ANSWER,
|
result_type=TaskResultType.ANSWER,
|
||||||
answer=result.answer,
|
answer=result.answer,
|
||||||
meta={
|
meta={
|
||||||
"workflow_id": self.workflow_id,
|
"workflow_id": self.workflow_id,
|
||||||
"intent": result.router_result.intent,
|
"intent": result.router_result.intent,
|
||||||
"diagnostics": diagnostics,
|
"diagnostics": diagnostics,
|
||||||
|
"llm_request": result.llm_request,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"completed",
|
||||||
|
{"workflow_id": self.workflow_id, "answer_length": len(result.answer or "")},
|
||||||
|
)
|
||||||
|
return result_payload
|
||||||
|
|||||||
@@ -14,11 +14,17 @@ class OpenApiWorkflow:
|
|||||||
self._runner = runner
|
self._runner = runner
|
||||||
|
|
||||||
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
def run(self, ctx: TaskRuntimeContext) -> WorkflowExecutionResult:
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"started",
|
||||||
|
{"workflow_id": self.workflow_id, "message": ctx.message},
|
||||||
|
)
|
||||||
result = self._runner.run(
|
result = self._runner.run(
|
||||||
ctx.message,
|
ctx.message,
|
||||||
ctx.rag_session_id,
|
ctx.rag_session_id,
|
||||||
conversation_state=ctx.conversation_state,
|
conversation_state=ctx.conversation_state,
|
||||||
mode="full",
|
mode="full",
|
||||||
|
trace=ctx.trace,
|
||||||
)
|
)
|
||||||
diagnostics = result.diagnostics.model_dump(mode="json")
|
diagnostics = result.diagnostics.model_dump(mode="json")
|
||||||
emit_status_block(
|
emit_status_block(
|
||||||
@@ -51,7 +57,7 @@ class OpenApiWorkflow:
|
|||||||
format="yaml",
|
format="yaml",
|
||||||
source_refs=list(result.diagnostics.doc_paths),
|
source_refs=list(result.diagnostics.doc_paths),
|
||||||
)
|
)
|
||||||
return WorkflowExecutionResult(
|
result_payload = WorkflowExecutionResult(
|
||||||
result_type=TaskResultType.OPENAPI,
|
result_type=TaskResultType.OPENAPI,
|
||||||
answer=content,
|
answer=content,
|
||||||
artifacts=[artifact],
|
artifacts=[artifact],
|
||||||
@@ -59,5 +65,12 @@ class OpenApiWorkflow:
|
|||||||
"workflow_id": self.workflow_id,
|
"workflow_id": self.workflow_id,
|
||||||
"intent": result.router_result.intent,
|
"intent": result.router_result.intent,
|
||||||
"diagnostics": diagnostics,
|
"diagnostics": diagnostics,
|
||||||
|
"llm_request": result.llm_request,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
if ctx.trace is not None:
|
||||||
|
ctx.trace.module("task_workflow").log(
|
||||||
|
"completed",
|
||||||
|
{"workflow_id": self.workflow_id, "answer_length": len(content or "")},
|
||||||
|
)
|
||||||
|
return result_payload
|
||||||
|
|||||||
@@ -1,3 +0,0 @@
|
|||||||
from app.modules.agent_api.module import AgentApiModule
|
|
||||||
|
|
||||||
__all__ = ["AgentApiModule"]
|
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
__all__: list[str] = []
|
||||||
+5
-5
@@ -2,11 +2,11 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from app.modules.agent_api.domain.models.agent_request import AgentRequest
|
from app.modules.api.domain.models.agent_request import AgentRequest
|
||||||
from app.modules.agent_api.infrastructure.ids.request_id_factory import RequestIdFactory
|
from app.modules.api.infrastructure.ids.request_id_factory import RequestIdFactory
|
||||||
from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
||||||
from app.modules.agent_api.application.session_service import SessionService
|
from app.modules.api.application.session_service import SessionService
|
||||||
from app.modules.orchestration.facade import OrchestrationFacade
|
from app.modules.agent.orchestration.facade import OrchestrationFacade
|
||||||
|
|
||||||
|
|
||||||
class RequestService:
|
class RequestService:
|
||||||
+3
-3
@@ -3,9 +3,9 @@ from __future__ import annotations
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
from app.modules.agent_api.infrastructure.ids.session_id_factory import SessionIdFactory
|
from app.modules.api.infrastructure.ids.session_id_factory import SessionIdFactory
|
||||||
from app.modules.agent_api.infrastructure.stores.in_memory_session_store import InMemorySessionStore
|
from app.modules.api.infrastructure.stores.in_memory_session_store import InMemorySessionStore
|
||||||
from app.schemas.common import ModuleName
|
from app.schemas.common import ModuleName
|
||||||
|
|
||||||
|
|
||||||
+2
-2
@@ -1,8 +1,8 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent_api.infrastructure.streaming.sse_encoder import SseEncoder
|
from app.modules.api.infrastructure.streaming.sse_encoder import SseEncoder
|
||||||
from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
||||||
from app.schemas.common import ModuleName
|
from app.schemas.common import ModuleName
|
||||||
|
|
||||||
|
|
||||||
+1
-1
@@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
from app.core.exceptions import AppError
|
||||||
from app.modules.agent_api.application.request_service import RequestService
|
from app.modules.api.application.request_service import RequestService
|
||||||
from app.schemas.agent_api import AgentRequestCreateRequest, AgentRequestQueuedResponse, AgentRequestStateResponse
|
from app.schemas.agent_api import AgentRequestCreateRequest, AgentRequestQueuedResponse, AgentRequestStateResponse
|
||||||
from app.schemas.common import ModuleName
|
from app.schemas.common import ModuleName
|
||||||
|
|
||||||
+1
-1
@@ -6,7 +6,7 @@ from app.schemas.agent_api import (
|
|||||||
CreateAgentSessionResponse,
|
CreateAgentSessionResponse,
|
||||||
ResetAgentSessionResponse,
|
ResetAgentSessionResponse,
|
||||||
)
|
)
|
||||||
from app.modules.agent_api.application.session_service import SessionService
|
from app.modules.api.application.session_service import SessionService
|
||||||
|
|
||||||
|
|
||||||
class SessionController:
|
class SessionController:
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
from app.modules.agent_api.application.stream_service import StreamService
|
from app.modules.api.application.stream_service import StreamService
|
||||||
|
|
||||||
|
|
||||||
class StreamController:
|
class StreamController:
|
||||||
+14
-8
@@ -3,11 +3,11 @@ from __future__ import annotations
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
from app.modules.agent_api.domain.models.agent_request import AgentRequest
|
from app.modules.api.domain.models.agent_request import AgentRequest
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
from app.modules.agent_api.infrastructure.logging.trace_file_path_builder import TraceFilePathBuilder
|
from app.modules.api.infrastructure.logging.trace_file_path_builder import TraceFilePathBuilder
|
||||||
from app.modules.agent_api.infrastructure.logging.trace_markdown_writer import TraceMarkdownWriter
|
from app.modules.api.infrastructure.logging.trace_markdown_writer import TraceMarkdownWriter
|
||||||
|
|
||||||
|
|
||||||
class RequestTraceLogger:
|
class RequestTraceLogger:
|
||||||
@@ -39,11 +39,17 @@ class RequestTraceLogger:
|
|||||||
def log_step(self, request_id: str, step: str, status: str, details: dict | None = None) -> None:
|
def log_step(self, request_id: str, step: str, status: str, details: dict | None = None) -> None:
|
||||||
self._append(request_id, f"Step {step}", {"status": status, "details": details or {}})
|
self._append(request_id, f"Step {step}", {"status": status, "details": details or {}})
|
||||||
|
|
||||||
|
def log_module(self, request_id: str, module: str, title: str, payload: dict | None = None) -> None:
|
||||||
|
body = {"event": title}
|
||||||
|
body.update(payload or {})
|
||||||
|
self._append(request_id, module, body)
|
||||||
|
|
||||||
def log_event(self, event: ClientEventRecord) -> None:
|
def log_event(self, event: ClientEventRecord) -> None:
|
||||||
self._append(
|
self._append(
|
||||||
event.request_id,
|
event.request_id,
|
||||||
f"Event {event.type.value}",
|
"client_event",
|
||||||
{
|
{
|
||||||
|
"event": event.type.value,
|
||||||
"source": event.source,
|
"source": event.source,
|
||||||
"text": event.text,
|
"text": event.text,
|
||||||
"payload": event.payload,
|
"payload": event.payload,
|
||||||
@@ -54,7 +60,7 @@ class RequestTraceLogger:
|
|||||||
def complete_request(self, request: AgentRequest) -> None:
|
def complete_request(self, request: AgentRequest) -> None:
|
||||||
self._append(
|
self._append(
|
||||||
request.request_id,
|
request.request_id,
|
||||||
"Result",
|
"result",
|
||||||
{
|
{
|
||||||
"status": request.status.value,
|
"status": request.status.value,
|
||||||
"answer": request.answer,
|
"answer": request.answer,
|
||||||
@@ -65,7 +71,7 @@ class RequestTraceLogger:
|
|||||||
def fail_request(self, request: AgentRequest) -> None:
|
def fail_request(self, request: AgentRequest) -> None:
|
||||||
self._append(
|
self._append(
|
||||||
request.request_id,
|
request.request_id,
|
||||||
"Error",
|
"result",
|
||||||
{
|
{
|
||||||
"status": request.status.value,
|
"status": request.status.value,
|
||||||
"error": request.error.model_dump(mode="json") if request.error else None,
|
"error": request.error.model_dump(mode="json") if request.error else None,
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
from app.modules.agent_api.domain.models.agent_request import AgentRequest
|
from app.modules.api.domain.models.agent_request import AgentRequest
|
||||||
|
|
||||||
|
|
||||||
class InMemoryRequestStore:
|
class InMemoryRequestStore:
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
from app.modules.api.domain.models.agent_session import AgentSession
|
||||||
|
|
||||||
|
|
||||||
class InMemorySessionStore:
|
class InMemorySessionStore:
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
|
|
||||||
|
|
||||||
class ReplayBuffer:
|
class ReplayBuffer:
|
||||||
+1
-1
@@ -2,7 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
|
|
||||||
|
|
||||||
class SseEncoder:
|
class SseEncoder:
|
||||||
+2
-2
@@ -3,8 +3,8 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from app.modules.agent_api.domain.events.client_event import ClientEventRecord
|
from app.modules.api.domain.events.client_event import ClientEventRecord
|
||||||
from app.modules.agent_api.infrastructure.streaming.replay_buffer import ReplayBuffer
|
from app.modules.api.infrastructure.streaming.replay_buffer import ReplayBuffer
|
||||||
|
|
||||||
|
|
||||||
class SseEventChannel:
|
class SseEventChannel:
|
||||||
@@ -2,16 +2,16 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
|
||||||
from app.modules.agent_api.application.request_service import RequestService
|
from app.modules.api.application.request_service import RequestService
|
||||||
from app.modules.agent_api.application.session_service import SessionService
|
from app.modules.api.application.session_service import SessionService
|
||||||
from app.modules.agent_api.application.stream_service import StreamService
|
from app.modules.api.application.stream_service import StreamService
|
||||||
from app.modules.agent_api.controllers.request_controller import RequestController
|
from app.modules.api.controllers.request_controller import RequestController
|
||||||
from app.modules.agent_api.controllers.session_controller import SessionController
|
from app.modules.api.controllers.session_controller import SessionController
|
||||||
from app.modules.agent_api.controllers.stream_controller import StreamController
|
from app.modules.api.controllers.stream_controller import StreamController
|
||||||
from app.modules.agent_api.public_router import build_public_router
|
from app.modules.api.public_router import build_public_router
|
||||||
|
|
||||||
|
|
||||||
class AgentApiModule:
|
class ApiModule:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
sessions: SessionService,
|
sessions: SessionService,
|
||||||
@@ -2,9 +2,9 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
|
||||||
from app.modules.agent_api.controllers.request_controller import RequestController
|
from app.modules.api.controllers.request_controller import RequestController
|
||||||
from app.modules.agent_api.controllers.session_controller import SessionController
|
from app.modules.api.controllers.session_controller import SessionController
|
||||||
from app.modules.agent_api.controllers.stream_controller import StreamController
|
from app.modules.api.controllers.stream_controller import StreamController
|
||||||
from app.schemas.agent_api import (
|
from app.schemas.agent_api import (
|
||||||
AgentRequestCreateRequest,
|
AgentRequestCreateRequest,
|
||||||
AgentRequestQueuedResponse,
|
AgentRequestQueuedResponse,
|
||||||
@@ -13,32 +13,36 @@ from app.modules.agent.task_runtime.workflows import (
|
|||||||
OpenApiWorkflow,
|
OpenApiWorkflow,
|
||||||
)
|
)
|
||||||
from app.modules.agent.task_runtime.workflows.general_qa import GeneralQaWorkflow
|
from app.modules.agent.task_runtime.workflows.general_qa import GeneralQaWorkflow
|
||||||
from app.modules.agent_api import AgentApiModule
|
from app.modules.api.module import ApiModule
|
||||||
from app.modules.agent_api.application.request_service import RequestService
|
from app.modules.api.application.request_service import RequestService
|
||||||
from app.modules.agent_api.application.session_service import SessionService
|
from app.modules.api.application.session_service import SessionService
|
||||||
from app.modules.agent_api.application.stream_service import StreamService
|
from app.modules.api.application.stream_service import StreamService
|
||||||
from app.modules.agent_api.infrastructure.ids.request_id_factory import RequestIdFactory
|
from app.modules.api.infrastructure.ids.request_id_factory import RequestIdFactory
|
||||||
from app.modules.agent_api.infrastructure.ids.session_id_factory import SessionIdFactory
|
from app.modules.api.infrastructure.ids.session_id_factory import SessionIdFactory
|
||||||
from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
from app.modules.api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
||||||
from app.modules.agent_api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
from app.modules.api.infrastructure.stores.in_memory_request_store import InMemoryRequestStore
|
||||||
from app.modules.agent_api.infrastructure.stores.in_memory_session_store import InMemorySessionStore
|
from app.modules.api.infrastructure.stores.in_memory_session_store import InMemorySessionStore
|
||||||
from app.modules.agent_api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
from app.modules.api.infrastructure.streaming.sse_event_channel import SseEventChannel
|
||||||
from app.modules.orchestration import OrchestrationFacade
|
from app.modules.agent.orchestration.facade import OrchestrationFacade
|
||||||
from app.modules.orchestration.adapters.intent_router_adapter import IntentRouterAdapter
|
from app.modules.agent.orchestration.adapters.intent_router_adapter import IntentRouterAdapter
|
||||||
from app.modules.orchestration.adapters.llm_chat_adapter import LlmChatAdapter
|
from app.modules.agent.orchestration.adapters.llm_chat_adapter import LlmChatAdapter
|
||||||
from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
from app.modules.agent.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
||||||
from app.modules.orchestration.processes.registry import ProcessRegistry
|
from app.modules.agent.orchestration.processes.registry import ProcessRegistry
|
||||||
from app.modules.orchestration.processes.v1.process import V1Process
|
from app.modules.agent.orchestration.processes.v1.process import V1Process
|
||||||
from app.modules.orchestration.processes.v1.steps.bootstrap_step import BootstrapStep
|
from app.modules.agent.orchestration.processes.v1.prompt_payload_builder import V1PromptPayloadBuilder
|
||||||
from app.modules.orchestration.processes.v1.steps.finalize_step import FinalizeStep
|
from app.modules.agent.orchestration.processes.v1.simple_llm_workflow import SimpleLlmWorkflow
|
||||||
from app.modules.orchestration.processes.v1.steps.run_llm_step import RunLlmStep
|
from app.modules.agent.orchestration.processes.v1.steps.bootstrap_step import BootstrapStep
|
||||||
from app.modules.orchestration.processes.v2.process import V2Process
|
from app.modules.agent.orchestration.processes.v1.steps.execute_llm_workflow_step import ExecuteLlmWorkflowStep
|
||||||
from app.modules.orchestration.processes.v2.steps.execute_documentation_workflow_step import ExecuteDocumentationWorkflowStep
|
from app.modules.agent.orchestration.processes.v1.steps.finalize_step import FinalizeStep
|
||||||
from app.modules.orchestration.processes.v2.steps.execute_fallback_workflow_step import ExecuteFallbackWorkflowStep
|
from app.modules.agent.orchestration.processes.v2.prompt_payload_builder import V2PromptPayloadBuilder
|
||||||
from app.modules.orchestration.processes.v2.steps.execute_general_qa_workflow_step import ExecuteGeneralQaWorkflowStep
|
from app.modules.agent.orchestration.processes.v2.prompt_selector import V2PromptSelector
|
||||||
from app.modules.orchestration.processes.v2.steps.execute_openapi_workflow_step import ExecuteOpenApiWorkflowStep
|
from app.modules.agent.orchestration.processes.v2.process import V2Process
|
||||||
from app.modules.orchestration.processes.v2.steps.route_intent_step import RouteIntentStep
|
from app.modules.agent.orchestration.processes.v2.steps.execute_documentation_workflow_step import ExecuteDocumentationWorkflowStep
|
||||||
from app.modules.orchestration.runtime.process_runner import ProcessRunner
|
from app.modules.agent.orchestration.processes.v2.steps.execute_fallback_workflow_step import ExecuteFallbackWorkflowStep
|
||||||
|
from app.modules.agent.orchestration.processes.v2.steps.execute_general_qa_workflow_step import ExecuteGeneralQaWorkflowStep
|
||||||
|
from app.modules.agent.orchestration.processes.v2.steps.execute_openapi_workflow_step import ExecuteOpenApiWorkflowStep
|
||||||
|
from app.modules.agent.orchestration.processes.v2.steps.route_intent_step import RouteIntentStep
|
||||||
|
from app.modules.agent.orchestration.runtime.process_runner import ProcessRunner
|
||||||
from app.modules.rag.persistence.repository import RagRepository
|
from app.modules.rag.persistence.repository import RagRepository
|
||||||
from app.modules.agent.runtime.story_context_repository import StoryContextRepository, StoryContextSchemaRepository
|
from app.modules.agent.runtime.story_context_repository import StoryContextRepository, StoryContextSchemaRepository
|
||||||
from app.modules.rag.module import RagModule, RagRepoModule
|
from app.modules.rag.module import RagModule, RagRepoModule
|
||||||
@@ -66,8 +70,10 @@ class ModularApplication:
|
|||||||
|
|
||||||
_giga_settings = GigaChatSettings.from_env()
|
_giga_settings = GigaChatSettings.from_env()
|
||||||
_giga_client = GigaChatClient(_giga_settings, GigaChatTokenProvider(_giga_settings))
|
_giga_client = GigaChatClient(_giga_settings, GigaChatTokenProvider(_giga_settings))
|
||||||
_prompt_loader = PromptLoader()
|
_v1_prompt_loader = PromptLoader(Path(__file__).resolve().parent / "agent/orchestration/processes/v1/prompts.yml")
|
||||||
self._agent_llm = AgentLlmService(client=_giga_client, prompts=_prompt_loader)
|
_v2_prompt_loader = PromptLoader(Path(__file__).resolve().parent / "agent/orchestration/processes/v2/prompts.yml")
|
||||||
|
self._agent_llm_v1 = AgentLlmService(client=_giga_client, prompts=_v1_prompt_loader)
|
||||||
|
self._agent_llm_v2 = AgentLlmService(client=_giga_client, prompts=_v2_prompt_loader)
|
||||||
_router = IntentRouterV2()
|
_router = IntentRouterV2()
|
||||||
_retrieval = RuntimeRetrievalAdapter(self.rag_repository)
|
_retrieval = RuntimeRetrievalAdapter(self.rag_repository)
|
||||||
_repo_context_factory = RuntimeRepoContextFactory()
|
_repo_context_factory = RuntimeRepoContextFactory()
|
||||||
@@ -75,15 +81,17 @@ class ModularApplication:
|
|||||||
router=_router,
|
router=_router,
|
||||||
retrieval_adapter=_retrieval,
|
retrieval_adapter=_retrieval,
|
||||||
repo_context=_repo_context_factory.build(),
|
repo_context=_repo_context_factory.build(),
|
||||||
llm=self._agent_llm,
|
llm=self._agent_llm_v2,
|
||||||
|
prompt_selector=V2PromptSelector(),
|
||||||
|
prompt_payload_builder=V2PromptPayloadBuilder(),
|
||||||
)
|
)
|
||||||
_task_context_builder = TaskRuntimeContextBuilder(_repo_context_factory)
|
_task_context_builder = TaskRuntimeContextBuilder(_repo_context_factory)
|
||||||
_context_enrichment = ContextEnrichmentService()
|
_context_enrichment = ContextEnrichmentService()
|
||||||
_docs_workflow = DocsQaWorkflow(_docs_runner)
|
_docs_workflow = DocsQaWorkflow(_docs_runner)
|
||||||
_openapi_workflow = OpenApiWorkflow(_docs_runner)
|
_openapi_workflow = OpenApiWorkflow(_docs_runner)
|
||||||
_general_qa_workflow = GeneralQaWorkflow(_docs_runner)
|
_general_qa_workflow = GeneralQaWorkflow(_docs_runner)
|
||||||
_fallback_workflow = FallbackWorkflow(self._agent_llm)
|
_fallback_workflow = FallbackWorkflow(self._agent_llm_v2)
|
||||||
_docs_generation_workflow = DocumentationGenerationWorkflow(self._agent_llm, DocumentationTemplateRegistry())
|
_docs_generation_workflow = DocumentationGenerationWorkflow(self._agent_llm_v2, DocumentationTemplateRegistry())
|
||||||
self._docs_generation_workflow = _docs_generation_workflow
|
self._docs_generation_workflow = _docs_generation_workflow
|
||||||
|
|
||||||
self.agent_sessions = InMemorySessionStore()
|
self.agent_sessions = InMemorySessionStore()
|
||||||
@@ -91,8 +99,12 @@ class ModularApplication:
|
|||||||
self.agent_events = SseEventChannel()
|
self.agent_events = SseEventChannel()
|
||||||
self.agent_trace_logger = RequestTraceLogger(Path("runtime_traces/agent_requests"))
|
self.agent_trace_logger = RequestTraceLogger(Path("runtime_traces/agent_requests"))
|
||||||
_publisher = ClientMessagePublisher(self.agent_events, self.agent_trace_logger)
|
_publisher = ClientMessagePublisher(self.agent_events, self.agent_trace_logger)
|
||||||
|
_v1_workflow = SimpleLlmWorkflow(
|
||||||
|
LlmChatAdapter(self._agent_llm_v1, prompt_name="simple_llm_answer"),
|
||||||
|
V1PromptPayloadBuilder(),
|
||||||
|
)
|
||||||
_process_registry = ProcessRegistry(
|
_process_registry = ProcessRegistry(
|
||||||
V1Process([BootstrapStep(), RunLlmStep(LlmChatAdapter(self._agent_llm)), FinalizeStep()]),
|
V1Process([BootstrapStep(), ExecuteLlmWorkflowStep(_v1_workflow), FinalizeStep()]),
|
||||||
V2Process(
|
V2Process(
|
||||||
[
|
[
|
||||||
BootstrapStep(),
|
BootstrapStep(),
|
||||||
@@ -123,7 +135,7 @@ class ModularApplication:
|
|||||||
sessions=_session_service,
|
sessions=_session_service,
|
||||||
orchestration=_orchestration,
|
orchestration=_orchestration,
|
||||||
)
|
)
|
||||||
self.agent_api = AgentApiModule(
|
self.api = ApiModule(
|
||||||
sessions=_session_service,
|
sessions=_session_service,
|
||||||
requests=_request_service,
|
requests=_request_service,
|
||||||
streams=StreamService(self.agent_events, request_exists=lambda request_id: self.agent_requests.get(request_id) is not None),
|
streams=StreamService(self.agent_events, request_exists=lambda request_id: self.agent_requests.get(request_id) is not None),
|
||||||
|
|||||||
@@ -1,98 +0,0 @@
|
|||||||
# Модуль chat
|
|
||||||
|
|
||||||
## 1. Функции модуля
|
|
||||||
- Внешний API чата: создание диалога, отправка сообщения, получение статуса задачи.
|
|
||||||
- Асинхронная оркестрация выполнения через `ChatOrchestrator`.
|
|
||||||
- Idempotency и стриминг событий по SSE.
|
|
||||||
|
|
||||||
## 2. Диаграмма классов и взаимосвязей
|
|
||||||
```mermaid
|
|
||||||
classDiagram
|
|
||||||
class ChatModule
|
|
||||||
class ChatOrchestrator
|
|
||||||
class TaskStore
|
|
||||||
class DialogSessionStore
|
|
||||||
class IdempotencyStore
|
|
||||||
class EventBus
|
|
||||||
class AgentRunner
|
|
||||||
|
|
||||||
ChatModule --> ChatOrchestrator
|
|
||||||
ChatModule --> TaskStore
|
|
||||||
ChatModule --> DialogSessionStore
|
|
||||||
ChatModule --> IdempotencyStore
|
|
||||||
ChatModule --> EventBus
|
|
||||||
ChatOrchestrator --> AgentRunner
|
|
||||||
ChatOrchestrator --> TaskStore
|
|
||||||
ChatOrchestrator --> DialogSessionStore
|
|
||||||
ChatOrchestrator --> EventBus
|
|
||||||
```
|
|
||||||
|
|
||||||
## 3. Описание классов
|
|
||||||
- `ChatModule`: фасад модуля и регистрация публичных chat endpoint'ов.
|
|
||||||
Методы: `__init__` — собирает stores/orchestrator; `public_router` — публикует REST и SSE маршруты чата.
|
|
||||||
- `ChatOrchestrator`: выполняет жизненный цикл user-message как фоновой задачи.
|
|
||||||
Методы: `enqueue_message` — создает задачу и запускает обработку; `_process_task` — исполняет runtime и сохраняет результат; `_resolve_sessions` — валидирует и сопоставляет dialog/rag сессии.
|
|
||||||
- `TaskStore`: in-memory store состояний задач.
|
|
||||||
Методы: `create` — создает новую `TaskState`; `get` — возвращает задачу по `task_id`; `save` — обновляет состояние задачи.
|
|
||||||
- `DialogSessionStore`: хранилище dialog-сессий поверх БД.
|
|
||||||
Методы: `create` — создает новую dialog-сессию; `get` — читает dialog-сессию по id.
|
|
||||||
- `IdempotencyStore`: предотвращает дубль задач по идемпотентному ключу.
|
|
||||||
Методы: `get_task_id` — возвращает существующий `task_id` по ключу; `put` — сохраняет ключ и `task_id`.
|
|
||||||
- `EventBus`: асинхронная публикация/подписка событий.
|
|
||||||
Методы: `subscribe` — создает подписку на канал; `unsubscribe` — снимает подписку; `publish` — отправляет событие подписчикам; `as_sse` — сериализует событие в SSE формат.
|
|
||||||
- `AgentRunner` (контракт): интерфейс выполнения агентного запроса из chat-слоя.
|
|
||||||
Методы: `run` — принимает данные задачи и возвращает итог `answer`/`changeset`.
|
|
||||||
|
|
||||||
## 4. Сиквенс-диаграммы API
|
|
||||||
|
|
||||||
### POST /api/chat/dialogs
|
|
||||||
Назначение: создает новый диалог, связанный с существующей `rag_session`, чтобы пользователь мог отправлять сообщения в контексте конкретного индекса.
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant Router as ChatModule.APIRouter
|
|
||||||
participant RagSessions as RagSessionStore
|
|
||||||
participant Dialogs as DialogSessionStore
|
|
||||||
|
|
||||||
Router->>RagSessions: get(rag_session_id)
|
|
||||||
RagSessions-->>Router: exists
|
|
||||||
Router->>Dialogs: create(rag_session_id)
|
|
||||||
Dialogs-->>Router: dialog_session
|
|
||||||
```
|
|
||||||
|
|
||||||
### POST /api/chat/messages
|
|
||||||
Назначение: ставит сообщение пользователя в асинхронную обработку и возвращает `task_id` для отслеживания результата.
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant Router as ChatModule.APIRouter
|
|
||||||
participant Orchestrator as ChatOrchestrator
|
|
||||||
participant TaskStore as TaskStore
|
|
||||||
|
|
||||||
Router->>Orchestrator: enqueue_message(request, idempotency_key)
|
|
||||||
Orchestrator->>TaskStore: create()/save()
|
|
||||||
Orchestrator-->>Router: task_id,status
|
|
||||||
```
|
|
||||||
|
|
||||||
### GET /api/tasks/{task_id}
|
|
||||||
Назначение: отдает текущее состояние задачи и финальный результат (answer/changeset/error), когда обработка завершена.
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant Router as ChatModule.APIRouter
|
|
||||||
participant TaskStore as TaskStore
|
|
||||||
|
|
||||||
Router->>TaskStore: get(task_id)
|
|
||||||
TaskStore-->>Router: task_state
|
|
||||||
```
|
|
||||||
|
|
||||||
### GET /api/events?task_id=...
|
|
||||||
Назначение: открывает SSE-поток с прогрессом выполнения задачи и промежуточными событиями.
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant Router as ChatModule.APIRouter
|
|
||||||
participant Events as EventBus
|
|
||||||
|
|
||||||
Router->>Events: subscribe(task_id)
|
|
||||||
loop until disconnect
|
|
||||||
Events-->>Router: SSE event
|
|
||||||
end
|
|
||||||
Router->>Events: unsubscribe(task_id)
|
|
||||||
```
|
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from app.modules.chat.repository import ChatRepository
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DialogSession:
|
|
||||||
dialog_session_id: str
|
|
||||||
rag_session_id: str
|
|
||||||
|
|
||||||
|
|
||||||
class DialogSessionStore:
|
|
||||||
def __init__(self, repository: ChatRepository) -> None:
|
|
||||||
self._repo = repository
|
|
||||||
|
|
||||||
def create(self, rag_session_id: str) -> DialogSession:
|
|
||||||
session = DialogSession(dialog_session_id=str(uuid4()), rag_session_id=rag_session_id)
|
|
||||||
self._repo.create_dialog(session.dialog_session_id, session.rag_session_id)
|
|
||||||
return session
|
|
||||||
|
|
||||||
def get(self, dialog_session_id: str) -> DialogSession | None:
|
|
||||||
row = self._repo.get_dialog(dialog_session_id)
|
|
||||||
if not row:
|
|
||||||
return None
|
|
||||||
return DialogSession(
|
|
||||||
dialog_session_id=str(row["dialog_session_id"]),
|
|
||||||
rag_session_id=str(row["rag_session_id"]),
|
|
||||||
)
|
|
||||||
@@ -1,71 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
from app.modules.agent.llm import AgentLlmService
|
|
||||||
from app.modules.chat.evidence_gate import CodeExplainEvidenceGate
|
|
||||||
from app.modules.chat.session_resolver import ChatSessionResolver
|
|
||||||
from app.modules.chat.task_store import TaskState, TaskStore
|
|
||||||
from app.modules.agent.runtime.steps.explain import CodeExplainRetrieverV2, PromptBudgeter
|
|
||||||
from app.schemas.chat import ChatMessageRequest, TaskQueuedResponse, TaskResultType, TaskStatus
|
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class CodeExplainChatService:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
retriever: CodeExplainRetrieverV2,
|
|
||||||
llm: AgentLlmService,
|
|
||||||
session_resolver: ChatSessionResolver,
|
|
||||||
task_store: TaskStore,
|
|
||||||
message_sink,
|
|
||||||
budgeter: PromptBudgeter | None = None,
|
|
||||||
evidence_gate: CodeExplainEvidenceGate | None = None,
|
|
||||||
) -> None:
|
|
||||||
self._retriever = retriever
|
|
||||||
self._llm = llm
|
|
||||||
self._session_resolver = session_resolver
|
|
||||||
self._task_store = task_store
|
|
||||||
self._message_sink = message_sink
|
|
||||||
self._budgeter = budgeter or PromptBudgeter()
|
|
||||||
self._evidence_gate = evidence_gate or CodeExplainEvidenceGate()
|
|
||||||
|
|
||||||
async def handle_message(self, request: ChatMessageRequest) -> TaskQueuedResponse:
|
|
||||||
dialog_session_id, rag_session_id = self._session_resolver.resolve(request)
|
|
||||||
task_id = str(uuid4())
|
|
||||||
task = TaskState(task_id=task_id, status=TaskStatus.RUNNING)
|
|
||||||
self._task_store.save(task)
|
|
||||||
self._message_sink(dialog_session_id, "user", request.message, task_id=task_id)
|
|
||||||
pack = self._retriever.build_pack(
|
|
||||||
rag_session_id,
|
|
||||||
request.message,
|
|
||||||
file_candidates=[item.model_dump(mode="json") for item in request.files],
|
|
||||||
)
|
|
||||||
decision = self._evidence_gate.evaluate(pack)
|
|
||||||
if decision.passed:
|
|
||||||
prompt_input = self._budgeter.build_prompt_input(request.message, pack)
|
|
||||||
answer = self._llm.generate(
|
|
||||||
"code_explain_answer_v2",
|
|
||||||
prompt_input,
|
|
||||||
log_context="chat.code_explain.direct",
|
|
||||||
).strip()
|
|
||||||
else:
|
|
||||||
answer = decision.answer
|
|
||||||
self._message_sink(dialog_session_id, "assistant", answer, task_id=task_id)
|
|
||||||
task.status = TaskStatus.DONE
|
|
||||||
task.result_type = TaskResultType.ANSWER
|
|
||||||
task.answer = answer
|
|
||||||
self._task_store.save(task)
|
|
||||||
LOGGER.warning(
|
|
||||||
"direct code explain response: task_id=%s rag_session_id=%s excerpts=%s missing=%s",
|
|
||||||
task_id,
|
|
||||||
rag_session_id,
|
|
||||||
len(pack.code_excerpts),
|
|
||||||
pack.missing,
|
|
||||||
)
|
|
||||||
return TaskQueuedResponse(
|
|
||||||
task_id=task_id,
|
|
||||||
status=TaskStatus.DONE.value,
|
|
||||||
)
|
|
||||||
@@ -1,62 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
from app.modules.agent.runtime.steps.explain.models import ExplainPack
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
|
||||||
class EvidenceGateDecision:
|
|
||||||
passed: bool
|
|
||||||
answer: str = ""
|
|
||||||
diagnostics: dict[str, list[str]] = field(default_factory=dict)
|
|
||||||
|
|
||||||
|
|
||||||
class CodeExplainEvidenceGate:
|
|
||||||
def __init__(self, min_excerpts: int = 2) -> None:
|
|
||||||
self._min_excerpts = min_excerpts
|
|
||||||
|
|
||||||
def evaluate(self, pack: ExplainPack) -> EvidenceGateDecision:
|
|
||||||
diagnostics = self._diagnostics(pack)
|
|
||||||
if len(pack.code_excerpts) >= self._min_excerpts:
|
|
||||||
return EvidenceGateDecision(passed=True, diagnostics=diagnostics)
|
|
||||||
return EvidenceGateDecision(
|
|
||||||
passed=False,
|
|
||||||
answer=self._build_answer(pack, diagnostics),
|
|
||||||
diagnostics=diagnostics,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _diagnostics(self, pack: ExplainPack) -> dict[str, list[str]]:
|
|
||||||
return {
|
|
||||||
"entrypoints": [item.title for item in pack.selected_entrypoints[:3] if item.title],
|
|
||||||
"symbols": [item.title for item in pack.seed_symbols[:5] if item.title],
|
|
||||||
"paths": self._paths(pack),
|
|
||||||
"missing": list(pack.missing),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _paths(self, pack: ExplainPack) -> list[str]:
|
|
||||||
values: list[str] = []
|
|
||||||
for item in pack.selected_entrypoints + pack.seed_symbols:
|
|
||||||
path = item.source or (item.location.path if item.location else "")
|
|
||||||
if path and path not in values:
|
|
||||||
values.append(path)
|
|
||||||
for excerpt in pack.code_excerpts:
|
|
||||||
if excerpt.path and excerpt.path not in values:
|
|
||||||
values.append(excerpt.path)
|
|
||||||
return values[:6]
|
|
||||||
|
|
||||||
def _build_answer(self, pack: ExplainPack, diagnostics: dict[str, list[str]]) -> str:
|
|
||||||
lines = [
|
|
||||||
"Недостаточно опоры в коде, чтобы дать объяснение без догадок.",
|
|
||||||
"",
|
|
||||||
f"Найдено фрагментов кода: {len(pack.code_excerpts)} из {self._min_excerpts} минимально необходимых.",
|
|
||||||
]
|
|
||||||
if diagnostics["paths"]:
|
|
||||||
lines.append(f"Пути: {', '.join(diagnostics['paths'])}")
|
|
||||||
if diagnostics["entrypoints"]:
|
|
||||||
lines.append(f"Entrypoints: {', '.join(diagnostics['entrypoints'])}")
|
|
||||||
if diagnostics["symbols"]:
|
|
||||||
lines.append(f"Символы: {', '.join(diagnostics['symbols'])}")
|
|
||||||
if diagnostics["missing"]:
|
|
||||||
lines.append(f"Диагностика: {', '.join(diagnostics['missing'])}")
|
|
||||||
return "\n".join(lines).strip()
|
|
||||||
@@ -1,112 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from fastapi import APIRouter, Header
|
|
||||||
from fastapi.responses import StreamingResponse
|
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
|
||||||
from app.modules.chat.dialog_store import DialogSessionStore
|
|
||||||
from app.modules.chat.service import ChatOrchestrator
|
|
||||||
from app.modules.chat.task_store import TaskStore
|
|
||||||
from app.modules.shared.event_bus import EventBus
|
|
||||||
from app.modules.shared.idempotency_store import IdempotencyStore
|
|
||||||
from app.modules.shared.retry_executor import RetryExecutor
|
|
||||||
from app.schemas.chat import (
|
|
||||||
ChatMessageRequest,
|
|
||||||
DialogCreateRequest,
|
|
||||||
DialogCreateResponse,
|
|
||||||
TaskQueuedResponse,
|
|
||||||
TaskResultResponse,
|
|
||||||
)
|
|
||||||
from app.schemas.common import ModuleName
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from app.modules.chat.repository import ChatRepository
|
|
||||||
from app.modules.contracts import AgentRunner
|
|
||||||
from app.modules.rag.session_store import RagSessionStore
|
|
||||||
|
|
||||||
|
|
||||||
class ChatModule:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
agent_runner: AgentRunner,
|
|
||||||
event_bus: EventBus,
|
|
||||||
retry: RetryExecutor,
|
|
||||||
rag_sessions: RagSessionStore,
|
|
||||||
repository: ChatRepository,
|
|
||||||
task_store: TaskStore | None = None,
|
|
||||||
) -> None:
|
|
||||||
self._rag_sessions = rag_sessions
|
|
||||||
self.tasks = task_store or TaskStore()
|
|
||||||
self.dialogs = DialogSessionStore(repository)
|
|
||||||
self.idempotency = IdempotencyStore()
|
|
||||||
self.events = event_bus
|
|
||||||
self.chat = ChatOrchestrator(
|
|
||||||
task_store=self.tasks,
|
|
||||||
dialogs=self.dialogs,
|
|
||||||
idempotency=self.idempotency,
|
|
||||||
runtime=agent_runner,
|
|
||||||
events=self.events,
|
|
||||||
retry=retry,
|
|
||||||
rag_session_exists=lambda rag_session_id: rag_sessions.get(rag_session_id) is not None,
|
|
||||||
message_sink=repository.add_message,
|
|
||||||
)
|
|
||||||
|
|
||||||
def public_router(self) -> APIRouter:
|
|
||||||
router = APIRouter(tags=["chat"])
|
|
||||||
|
|
||||||
@router.post("/api/chat/dialogs", response_model=DialogCreateResponse)
|
|
||||||
async def create_dialog(request: DialogCreateRequest) -> DialogCreateResponse:
|
|
||||||
if not self._rag_sessions.get(request.rag_session_id):
|
|
||||||
raise AppError("rag_session_not_found", "RAG session not found", ModuleName.RAG)
|
|
||||||
dialog = self.dialogs.create(request.rag_session_id)
|
|
||||||
return DialogCreateResponse(
|
|
||||||
dialog_session_id=dialog.dialog_session_id,
|
|
||||||
rag_session_id=dialog.rag_session_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
@router.post("/api/chat/messages", response_model=TaskQueuedResponse | TaskResultResponse)
|
|
||||||
async def send_message(
|
|
||||||
request: ChatMessageRequest,
|
|
||||||
idempotency_key: str | None = Header(default=None, alias="Idempotency-Key"),
|
|
||||||
) -> TaskQueuedResponse | TaskResultResponse:
|
|
||||||
task = await self.chat.enqueue_message(request, idempotency_key)
|
|
||||||
return TaskQueuedResponse(task_id=task.task_id, status=task.status.value)
|
|
||||||
|
|
||||||
@router.get("/api/tasks/{task_id}", response_model=TaskResultResponse)
|
|
||||||
async def get_task(task_id: str) -> TaskResultResponse:
|
|
||||||
task = self.tasks.get(task_id)
|
|
||||||
if not task:
|
|
||||||
raise AppError("not_found", f"Task not found: {task_id}", ModuleName.BACKEND)
|
|
||||||
return TaskResultResponse(
|
|
||||||
task_id=task.task_id,
|
|
||||||
status=task.status,
|
|
||||||
result_type=task.result_type,
|
|
||||||
answer=task.answer,
|
|
||||||
artifacts=task.artifacts,
|
|
||||||
changeset=task.changeset,
|
|
||||||
error=task.error,
|
|
||||||
)
|
|
||||||
|
|
||||||
@router.get("/api/events")
|
|
||||||
async def stream_events(task_id: str) -> StreamingResponse:
|
|
||||||
queue = await self.events.subscribe(task_id)
|
|
||||||
|
|
||||||
async def event_stream():
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
heartbeat = 10
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
event = await asyncio.wait_for(queue.get(), timeout=heartbeat)
|
|
||||||
yield EventBus.as_sse(event)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
yield ": keepalive\\n\\n"
|
|
||||||
finally:
|
|
||||||
await self.events.unsubscribe(task_id, queue)
|
|
||||||
|
|
||||||
return StreamingResponse(event_stream(), media_type="text/event-stream")
|
|
||||||
|
|
||||||
return router
|
|
||||||
@@ -1,93 +0,0 @@
|
|||||||
import json
|
|
||||||
|
|
||||||
from sqlalchemy import text
|
|
||||||
|
|
||||||
from app.modules.shared.db import get_engine
|
|
||||||
|
|
||||||
|
|
||||||
class ChatRepository:
|
|
||||||
def ensure_tables(self) -> None:
|
|
||||||
with get_engine().connect() as conn:
|
|
||||||
conn.execute(
|
|
||||||
text(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS dialog_sessions (
|
|
||||||
dialog_session_id VARCHAR(64) PRIMARY KEY,
|
|
||||||
rag_session_id VARCHAR(64) NOT NULL,
|
|
||||||
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
conn.execute(
|
|
||||||
text(
|
|
||||||
"""
|
|
||||||
CREATE TABLE IF NOT EXISTS chat_messages (
|
|
||||||
id BIGSERIAL PRIMARY KEY,
|
|
||||||
dialog_session_id VARCHAR(64) NOT NULL,
|
|
||||||
task_id VARCHAR(64),
|
|
||||||
role VARCHAR(16) NOT NULL,
|
|
||||||
content TEXT NOT NULL,
|
|
||||||
payload JSONB,
|
|
||||||
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
conn.execute(text("ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS task_id VARCHAR(64)"))
|
|
||||||
conn.execute(text("ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS payload JSONB"))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
def create_dialog(self, dialog_session_id: str, rag_session_id: str) -> None:
|
|
||||||
with get_engine().connect() as conn:
|
|
||||||
conn.execute(
|
|
||||||
text(
|
|
||||||
"""
|
|
||||||
INSERT INTO dialog_sessions (dialog_session_id, rag_session_id)
|
|
||||||
VALUES (:did, :sid)
|
|
||||||
"""
|
|
||||||
),
|
|
||||||
{"did": dialog_session_id, "sid": rag_session_id},
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
def get_dialog(self, dialog_session_id: str) -> dict | None:
|
|
||||||
with get_engine().connect() as conn:
|
|
||||||
row = conn.execute(
|
|
||||||
text(
|
|
||||||
"""
|
|
||||||
SELECT dialog_session_id, rag_session_id
|
|
||||||
FROM dialog_sessions
|
|
||||||
WHERE dialog_session_id = :did
|
|
||||||
"""
|
|
||||||
),
|
|
||||||
{"did": dialog_session_id},
|
|
||||||
).mappings().fetchone()
|
|
||||||
return dict(row) if row else None
|
|
||||||
|
|
||||||
def add_message(
|
|
||||||
self,
|
|
||||||
dialog_session_id: str,
|
|
||||||
role: str,
|
|
||||||
content: str,
|
|
||||||
task_id: str | None = None,
|
|
||||||
payload: dict | None = None,
|
|
||||||
) -> None:
|
|
||||||
payload_json = json.dumps(payload, ensure_ascii=False) if payload is not None else None
|
|
||||||
with get_engine().connect() as conn:
|
|
||||||
conn.execute(
|
|
||||||
text(
|
|
||||||
"""
|
|
||||||
INSERT INTO chat_messages (dialog_session_id, task_id, role, content, payload)
|
|
||||||
VALUES (:did, :task_id, :role, :content, CAST(:payload AS JSONB))
|
|
||||||
"""
|
|
||||||
),
|
|
||||||
{
|
|
||||||
"did": dialog_session_id,
|
|
||||||
"task_id": task_id,
|
|
||||||
"role": role,
|
|
||||||
"content": content,
|
|
||||||
"payload": payload_json,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
@@ -1,288 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
|
||||||
from app.modules.contracts import AgentRunner
|
|
||||||
from app.schemas.chat import ChatMessageRequest, TaskResultType, TaskStatus
|
|
||||||
from app.schemas.common import ErrorPayload, ModuleName
|
|
||||||
from app.modules.chat.dialog_store import DialogSessionStore
|
|
||||||
from app.modules.chat.session_resolver import ChatSessionResolver
|
|
||||||
from app.modules.chat.task_store import TaskState, TaskStore
|
|
||||||
from app.modules.shared.event_bus import EventBus
|
|
||||||
from app.modules.shared.idempotency_store import IdempotencyStore
|
|
||||||
from app.modules.shared.retry_executor import RetryExecutor
|
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def _truncate_for_log(text: str, max_chars: int = 1200) -> str:
|
|
||||||
value = (text or "").replace("\n", "\\n").strip()
|
|
||||||
if len(value) <= max_chars:
|
|
||||||
return value
|
|
||||||
return value[:max_chars].rstrip() + "...[truncated]"
|
|
||||||
|
|
||||||
|
|
||||||
class ChatOrchestrator:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
task_store: TaskStore,
|
|
||||||
dialogs: DialogSessionStore,
|
|
||||||
idempotency: IdempotencyStore,
|
|
||||||
runtime: AgentRunner,
|
|
||||||
events: EventBus,
|
|
||||||
retry: RetryExecutor,
|
|
||||||
rag_session_exists,
|
|
||||||
message_sink,
|
|
||||||
) -> None:
|
|
||||||
self._task_store = task_store
|
|
||||||
self._dialogs = dialogs
|
|
||||||
self._idempotency = idempotency
|
|
||||||
self._runtime = runtime
|
|
||||||
self._events = events
|
|
||||||
self._retry = retry
|
|
||||||
self._rag_session_exists = rag_session_exists
|
|
||||||
self._message_sink = message_sink
|
|
||||||
self._session_resolver = ChatSessionResolver(dialogs, rag_session_exists)
|
|
||||||
|
|
||||||
async def enqueue_message(
|
|
||||||
self,
|
|
||||||
request: ChatMessageRequest,
|
|
||||||
idempotency_key: str | None,
|
|
||||||
) -> TaskState:
|
|
||||||
if idempotency_key:
|
|
||||||
existing = self._idempotency.get_task_id(idempotency_key)
|
|
||||||
if existing:
|
|
||||||
task = self._task_store.get(existing)
|
|
||||||
if task:
|
|
||||||
LOGGER.info(
|
|
||||||
"enqueue_message reused task by idempotency key: task_id=%s mode=%s",
|
|
||||||
task.task_id,
|
|
||||||
request.mode.value,
|
|
||||||
)
|
|
||||||
return task
|
|
||||||
|
|
||||||
task = self._task_store.create()
|
|
||||||
if idempotency_key:
|
|
||||||
self._idempotency.put(idempotency_key, task.task_id)
|
|
||||||
asyncio.create_task(self._process_task(task.task_id, request))
|
|
||||||
LOGGER.info(
|
|
||||||
"enqueue_message created task: task_id=%s mode=%s",
|
|
||||||
task.task_id,
|
|
||||||
request.mode.value,
|
|
||||||
)
|
|
||||||
return task
|
|
||||||
|
|
||||||
async def _process_task(self, task_id: str, request: ChatMessageRequest) -> None:
|
|
||||||
task = self._task_store.get(task_id)
|
|
||||||
if not task:
|
|
||||||
return
|
|
||||||
task.status = TaskStatus.RUNNING
|
|
||||||
self._task_store.save(task)
|
|
||||||
await self._events.publish(task_id, "task_status", {"task_id": task_id, "status": task.status.value})
|
|
||||||
await self._publish_progress(task_id, "task.start", "Запрос принят, начинаю обработку.", progress=5)
|
|
||||||
|
|
||||||
heartbeat_stop = asyncio.Event()
|
|
||||||
heartbeat_task = asyncio.create_task(self._run_heartbeat(task_id, heartbeat_stop))
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self._publish_progress(task_id, "task.sessions", "Проверяю сессии диалога и проекта.", progress=10)
|
|
||||||
dialog_session_id, rag_session_id = self._resolve_sessions(request)
|
|
||||||
LOGGER.warning(
|
|
||||||
"incoming chat request: task_id=%s dialog_session_id=%s rag_session_id=%s mode=%s attachments=%s files=%s message=%s",
|
|
||||||
task_id,
|
|
||||||
dialog_session_id,
|
|
||||||
rag_session_id,
|
|
||||||
request.mode.value,
|
|
||||||
len(request.attachments),
|
|
||||||
len(request.files),
|
|
||||||
_truncate_for_log(request.message),
|
|
||||||
)
|
|
||||||
await self._publish_progress(task_id, "task.sessions.done", "Сессии проверены, запускаю агента.", progress=15)
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
|
|
||||||
def progress_cb(stage: str, message: str, kind: str = "task_progress", meta: dict | None = None):
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
|
||||||
self._events.publish(
|
|
||||||
task_id,
|
|
||||||
kind,
|
|
||||||
{
|
|
||||||
"task_id": task_id,
|
|
||||||
"stage": stage,
|
|
||||||
"message": message,
|
|
||||||
"meta": meta or {},
|
|
||||||
},
|
|
||||||
),
|
|
||||||
loop,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def op():
|
|
||||||
self._message_sink(dialog_session_id, "user", request.message, task_id=task_id)
|
|
||||||
await self._publish_progress(task_id, "task.agent.run", "Агент анализирует запрос и готовит ответ.", progress=20)
|
|
||||||
return await self._runtime.run(
|
|
||||||
task_id=task_id,
|
|
||||||
dialog_session_id=dialog_session_id,
|
|
||||||
rag_session_id=rag_session_id,
|
|
||||||
mode=request.mode.value,
|
|
||||||
message=request.message,
|
|
||||||
attachments=[a.model_dump(mode="json") for a in request.attachments],
|
|
||||||
files=[f.model_dump(mode="json") for f in request.files],
|
|
||||||
progress_cb=progress_cb,
|
|
||||||
)
|
|
||||||
|
|
||||||
result = await self._retry.run(op)
|
|
||||||
await self._publish_progress(task_id, "task.finalize", "Сохраняю финальный результат.", progress=95)
|
|
||||||
task.status = TaskStatus.DONE
|
|
||||||
task.result_type = TaskResultType(result.result_type)
|
|
||||||
task.answer = result.answer
|
|
||||||
task.artifacts = list(getattr(result, "artifacts", []) or [])
|
|
||||||
task.changeset = result.changeset
|
|
||||||
if task.result_type != TaskResultType.CHANGESET and (task.answer or task.artifacts):
|
|
||||||
payload = {
|
|
||||||
"result_type": task.result_type.value,
|
|
||||||
"artifacts": [item.model_dump(mode="json") for item in task.artifacts],
|
|
||||||
}
|
|
||||||
self._message_sink(dialog_session_id, "assistant", task.answer or "", task_id=task_id, payload=payload)
|
|
||||||
LOGGER.warning(
|
|
||||||
"outgoing chat response: task_id=%s dialog_session_id=%s result_type=%s answer=%s",
|
|
||||||
task_id,
|
|
||||||
dialog_session_id,
|
|
||||||
task.result_type.value,
|
|
||||||
_truncate_for_log(task.answer or ""),
|
|
||||||
)
|
|
||||||
elif task.result_type == TaskResultType.CHANGESET:
|
|
||||||
self._message_sink(
|
|
||||||
dialog_session_id,
|
|
||||||
"assistant",
|
|
||||||
f"changeset:{len(task.changeset)}",
|
|
||||||
task_id=task_id,
|
|
||||||
payload={
|
|
||||||
"result_type": TaskResultType.CHANGESET.value,
|
|
||||||
"changeset": [item.model_dump(mode="json") for item in task.changeset],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
LOGGER.warning(
|
|
||||||
"outgoing chat response: task_id=%s dialog_session_id=%s result_type=%s changeset_items=%s answer=%s",
|
|
||||||
task_id,
|
|
||||||
dialog_session_id,
|
|
||||||
task.result_type.value,
|
|
||||||
len(task.changeset),
|
|
||||||
_truncate_for_log(task.answer or ""),
|
|
||||||
)
|
|
||||||
self._task_store.save(task)
|
|
||||||
await self._events.publish(
|
|
||||||
task_id,
|
|
||||||
"task_result",
|
|
||||||
{
|
|
||||||
"task_id": task_id,
|
|
||||||
"status": task.status.value,
|
|
||||||
"result_type": task.result_type.value,
|
|
||||||
"answer": task.answer,
|
|
||||||
"artifacts": [item.model_dump(mode="json") for item in task.artifacts],
|
|
||||||
"changeset": [item.model_dump(mode="json") for item in task.changeset],
|
|
||||||
"meta": getattr(result, "meta", {}) or {},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
await self._publish_progress(task_id, "task.done", "Обработка завершена.", progress=100)
|
|
||||||
LOGGER.info(
|
|
||||||
"_process_task completed: task_id=%s status=%s result_type=%s changeset_items=%s",
|
|
||||||
task_id,
|
|
||||||
task.status.value,
|
|
||||||
task.result_type.value if task.result_type else "",
|
|
||||||
len(task.changeset),
|
|
||||||
)
|
|
||||||
except (AppError, TimeoutError, ConnectionError, OSError) as exc:
|
|
||||||
task.status = TaskStatus.ERROR
|
|
||||||
if isinstance(exc, AppError):
|
|
||||||
payload = ErrorPayload(code=exc.code, desc=exc.desc, module=exc.module)
|
|
||||||
else:
|
|
||||||
payload = ErrorPayload(
|
|
||||||
code="retry_exhausted",
|
|
||||||
desc="Temporary failure after retries. Please retry request.",
|
|
||||||
module=ModuleName.BACKEND,
|
|
||||||
)
|
|
||||||
task.error = payload
|
|
||||||
self._task_store.save(task)
|
|
||||||
await self._publish_progress(task_id, "task.error", "Не удалось завершить обработку запроса.", kind="task_thinking")
|
|
||||||
await self._events.publish(task_id, "task_error", payload.model_dump(mode="json"))
|
|
||||||
LOGGER.warning(
|
|
||||||
"_process_task handled error: task_id=%s code=%s module=%s desc=%s",
|
|
||||||
task_id,
|
|
||||||
payload.code,
|
|
||||||
payload.module.value,
|
|
||||||
payload.desc,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
task.status = TaskStatus.ERROR
|
|
||||||
payload = ErrorPayload(
|
|
||||||
code="agent_runtime_error",
|
|
||||||
desc="Agent execution failed unexpectedly. Please retry request.",
|
|
||||||
module=ModuleName.AGENT,
|
|
||||||
)
|
|
||||||
task.error = payload
|
|
||||||
self._task_store.save(task)
|
|
||||||
await self._publish_progress(
|
|
||||||
task_id,
|
|
||||||
"task.error",
|
|
||||||
"Во время выполнения возникла внутренняя ошибка.",
|
|
||||||
kind="task_thinking",
|
|
||||||
)
|
|
||||||
await self._events.publish(task_id, "task_error", payload.model_dump(mode="json"))
|
|
||||||
LOGGER.exception(
|
|
||||||
"_process_task unexpected error: task_id=%s code=%s",
|
|
||||||
task_id,
|
|
||||||
payload.code,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
heartbeat_stop.set()
|
|
||||||
await heartbeat_task
|
|
||||||
|
|
||||||
async def _publish_progress(
|
|
||||||
self,
|
|
||||||
task_id: str,
|
|
||||||
stage: str,
|
|
||||||
message: str,
|
|
||||||
*,
|
|
||||||
progress: int | None = None,
|
|
||||||
kind: str = "task_progress",
|
|
||||||
meta: dict | None = None,
|
|
||||||
) -> None:
|
|
||||||
payload = {
|
|
||||||
"task_id": task_id,
|
|
||||||
"stage": stage,
|
|
||||||
"message": message,
|
|
||||||
"meta": meta or {},
|
|
||||||
}
|
|
||||||
if progress is not None:
|
|
||||||
payload["progress"] = max(0, min(100, int(progress)))
|
|
||||||
await self._events.publish(task_id, kind, payload)
|
|
||||||
LOGGER.debug(
|
|
||||||
"_publish_progress emitted: task_id=%s kind=%s stage=%s progress=%s",
|
|
||||||
task_id,
|
|
||||||
kind,
|
|
||||||
stage,
|
|
||||||
payload.get("progress"),
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _run_heartbeat(self, task_id: str, stop_event: asyncio.Event) -> None:
|
|
||||||
messages = (
|
|
||||||
"Собираю данные по проекту.",
|
|
||||||
"Анализирую контекст и формирую структуру ответа.",
|
|
||||||
"Проверяю согласованность промежуточного результата.",
|
|
||||||
)
|
|
||||||
index = 0
|
|
||||||
while not stop_event.is_set():
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(stop_event.wait(), timeout=5.0)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
await self._publish_progress(
|
|
||||||
task_id,
|
|
||||||
"task.heartbeat",
|
|
||||||
messages[index % len(messages)],
|
|
||||||
kind="task_thinking",
|
|
||||||
meta={"heartbeat": True},
|
|
||||||
)
|
|
||||||
index += 1
|
|
||||||
LOGGER.debug("_run_heartbeat stopped: task_id=%s ticks=%s", task_id, index)
|
|
||||||
|
|
||||||
def _resolve_sessions(self, request: ChatMessageRequest) -> tuple[str, str]:
|
|
||||||
return self._session_resolver.resolve(request)
|
|
||||||
@@ -1,36 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from app.core.exceptions import AppError
|
|
||||||
from app.schemas.chat import ChatMessageRequest
|
|
||||||
from app.schemas.common import ModuleName
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from app.modules.chat.dialog_store import DialogSessionStore
|
|
||||||
|
|
||||||
|
|
||||||
class ChatSessionResolver:
|
|
||||||
def __init__(self, dialogs: DialogSessionStore, rag_session_exists) -> None:
|
|
||||||
self._dialogs = dialogs
|
|
||||||
self._rag_session_exists = rag_session_exists
|
|
||||||
|
|
||||||
def resolve(self, request: ChatMessageRequest) -> tuple[str, str]:
|
|
||||||
if request.dialog_session_id and request.rag_session_id:
|
|
||||||
dialog = self._dialogs.get(request.dialog_session_id)
|
|
||||||
if not dialog:
|
|
||||||
raise AppError("dialog_not_found", "Dialog session not found", ModuleName.BACKEND)
|
|
||||||
if dialog.rag_session_id != request.rag_session_id:
|
|
||||||
raise AppError("dialog_rag_mismatch", "Dialog session does not belong to rag session", ModuleName.BACKEND)
|
|
||||||
return request.dialog_session_id, request.rag_session_id
|
|
||||||
|
|
||||||
if request.session_id and request.project_id:
|
|
||||||
if not self._rag_session_exists(request.project_id):
|
|
||||||
raise AppError("rag_session_not_found", "RAG session not found", ModuleName.RAG)
|
|
||||||
return request.session_id, request.project_id
|
|
||||||
|
|
||||||
raise AppError(
|
|
||||||
"missing_sessions",
|
|
||||||
"dialog_session_id and rag_session_id are required",
|
|
||||||
ModuleName.BACKEND,
|
|
||||||
)
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
from dataclasses import dataclass, field
|
|
||||||
from threading import Lock
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
from app.schemas.changeset import ChangeItem
|
|
||||||
from app.schemas.chat import TaskArtifact, TaskResultType, TaskStatus
|
|
||||||
from app.schemas.common import ErrorPayload
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TaskState:
|
|
||||||
task_id: str
|
|
||||||
status: TaskStatus = TaskStatus.QUEUED
|
|
||||||
result_type: TaskResultType | None = None
|
|
||||||
answer: str | None = None
|
|
||||||
artifacts: list[TaskArtifact] = field(default_factory=list)
|
|
||||||
changeset: list[ChangeItem] = field(default_factory=list)
|
|
||||||
error: ErrorPayload | None = None
|
|
||||||
|
|
||||||
|
|
||||||
class TaskStore:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._items: dict[str, TaskState] = {}
|
|
||||||
self._lock = Lock()
|
|
||||||
|
|
||||||
def create(self) -> TaskState:
|
|
||||||
task = TaskState(task_id=str(uuid4()))
|
|
||||||
with self._lock:
|
|
||||||
self._items[task.task_id] = task
|
|
||||||
return task
|
|
||||||
|
|
||||||
def get(self, task_id: str) -> TaskState | None:
|
|
||||||
with self._lock:
|
|
||||||
return self._items.get(task_id)
|
|
||||||
|
|
||||||
def save(self, task: TaskState) -> None:
|
|
||||||
with self._lock:
|
|
||||||
self._items[task.task_id] = task
|
|
||||||
@@ -1,3 +0,0 @@
|
|||||||
from app.modules.orchestration.facade import OrchestrationFacade
|
|
||||||
|
|
||||||
__all__ = ["OrchestrationFacade"]
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from app.modules.agent.intent_router_v2 import IntentRouterV2
|
|
||||||
|
|
||||||
|
|
||||||
class IntentRouterAdapter:
|
|
||||||
def __init__(self, router: IntentRouterV2) -> None:
|
|
||||||
self._router = router
|
|
||||||
|
|
||||||
def route(self, user_query: str, conversation_state, repo_context):
|
|
||||||
return self._router.route(user_query, conversation_state, repo_context)
|
|
||||||
@@ -1,19 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
from app.modules.agent.llm.service import AgentLlmService
|
|
||||||
|
|
||||||
|
|
||||||
class LlmChatAdapter:
|
|
||||||
def __init__(self, llm: AgentLlmService, prompt_name: str = "agent_api_v1") -> None:
|
|
||||||
self._llm = llm
|
|
||||||
self._prompt_name = prompt_name
|
|
||||||
|
|
||||||
async def generate(self, message: str, request_id: str) -> str:
|
|
||||||
return await asyncio.to_thread(
|
|
||||||
self._llm.generate,
|
|
||||||
self._prompt_name,
|
|
||||||
message,
|
|
||||||
log_context=f"agent_api:{request_id}",
|
|
||||||
)
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from app.modules.agent_api.domain.models.agent_request import AgentRequest
|
|
||||||
from app.modules.agent_api.domain.models.agent_session import AgentSession
|
|
||||||
from app.modules.agent_api.infrastructure.logging.request_trace_logger import RequestTraceLogger
|
|
||||||
from app.modules.orchestration.messaging.client_message_publisher import ClientMessagePublisher
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
|
||||||
class ExecutionContext:
|
|
||||||
request: AgentRequest
|
|
||||||
session: AgentSession
|
|
||||||
publisher: ClientMessagePublisher
|
|
||||||
trace_logger: RequestTraceLogger
|
|
||||||
task_context: Any = None
|
|
||||||
route_result: Any = None
|
|
||||||
workflow_result: Any = None
|
|
||||||
@@ -33,7 +33,7 @@ class ResetAgentSessionResponse(BaseModel):
|
|||||||
class AgentRequestCreateRequest(BaseModel):
|
class AgentRequestCreateRequest(BaseModel):
|
||||||
session_id: str = Field(min_length=1)
|
session_id: str = Field(min_length=1)
|
||||||
message: str = Field(min_length=1)
|
message: str = Field(min_length=1)
|
||||||
process_version: str = Field(default="v2", min_length=1)
|
process_version: str = Field(default="v1", min_length=1)
|
||||||
|
|
||||||
|
|
||||||
class AgentRequestQueuedResponse(BaseModel):
|
class AgentRequestQueuedResponse(BaseModel):
|
||||||
|
|||||||
@@ -1,70 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
from app.modules.chat.module import ChatModule
|
|
||||||
from app.modules.chat.task_store import TaskStore
|
|
||||||
from app.schemas.chat import ChatMessageRequest
|
|
||||||
from app.schemas.chat import TaskQueuedResponse
|
|
||||||
from app.modules.shared.event_bus import EventBus
|
|
||||||
from app.modules.shared.retry_executor import RetryExecutor
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeRuntime:
|
|
||||||
async def run(self, **kwargs):
|
|
||||||
raise AssertionError("legacy runtime must not be called")
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeDirectChat:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.calls = 0
|
|
||||||
|
|
||||||
async def handle_message(self, request):
|
|
||||||
self.calls += 1
|
|
||||||
return TaskQueuedResponse(
|
|
||||||
task_id="task-1",
|
|
||||||
status="done",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeRagSessions:
|
|
||||||
def get(self, rag_session_id: str):
|
|
||||||
return {"rag_session_id": rag_session_id}
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeRepository:
|
|
||||||
def create_dialog(self, dialog_session_id: str, rag_session_id: str) -> None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_dialog(self, dialog_session_id: str):
|
|
||||||
return None
|
|
||||||
|
|
||||||
def add_message(self, dialog_session_id: str, role: str, content: str, task_id: str | None = None, payload: dict | None = None) -> None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def test_chat_messages_endpoint_uses_direct_service(monkeypatch) -> None:
|
|
||||||
monkeypatch.setenv("SIMPLE_CODE_EXPLAIN_ONLY", "true")
|
|
||||||
direct_chat = _FakeDirectChat()
|
|
||||||
module = ChatModule(
|
|
||||||
agent_runner=_FakeRuntime(),
|
|
||||||
event_bus=EventBus(),
|
|
||||||
retry=RetryExecutor(),
|
|
||||||
rag_sessions=_FakeRagSessions(),
|
|
||||||
repository=_FakeRepository(),
|
|
||||||
direct_chat=direct_chat,
|
|
||||||
task_store=TaskStore(),
|
|
||||||
)
|
|
||||||
router = module.public_router()
|
|
||||||
endpoint = next(route.endpoint for route in router.routes if getattr(route, "path", "") == "/api/chat/messages")
|
|
||||||
response = asyncio.run(
|
|
||||||
endpoint(
|
|
||||||
ChatMessageRequest(
|
|
||||||
session_id="dialog-1",
|
|
||||||
project_id="rag-1",
|
|
||||||
message="Explain get_user",
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.task_id == "task-1"
|
|
||||||
assert direct_chat.calls == 1
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
|
|
||||||
from app.modules.chat.direct_service import CodeExplainChatService
|
|
||||||
from app.modules.chat.session_resolver import ChatSessionResolver
|
|
||||||
from app.modules.chat.task_store import TaskStore
|
|
||||||
from app.modules.agent.runtime.steps.explain.models import ExplainIntent, ExplainPack
|
|
||||||
from app.schemas.chat import ChatFileContext, ChatMessageRequest
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeRetriever:
|
|
||||||
def build_pack(self, rag_session_id: str, user_query: str, *, file_candidates: list[dict] | None = None) -> ExplainPack:
|
|
||||||
return ExplainPack(
|
|
||||||
intent=ExplainIntent(raw_query=user_query, normalized_query=user_query),
|
|
||||||
missing=["code_excerpts"],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeLlm:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.calls = 0
|
|
||||||
|
|
||||||
def generate(self, prompt_name: str, user_input: str, *, log_context: str | None = None) -> str:
|
|
||||||
self.calls += 1
|
|
||||||
return "should not be called"
|
|
||||||
|
|
||||||
|
|
||||||
class _FakeDialogs:
|
|
||||||
def get(self, dialog_session_id: str):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def test_direct_service_skips_llm_when_evidence_is_insufficient() -> None:
|
|
||||||
messages: list[tuple[str, str, str, str | None]] = []
|
|
||||||
llm = _FakeLlm()
|
|
||||||
task_store = TaskStore()
|
|
||||||
service = CodeExplainChatService(
|
|
||||||
retriever=_FakeRetriever(),
|
|
||||||
llm=llm,
|
|
||||||
session_resolver=ChatSessionResolver(_FakeDialogs(), lambda rag_session_id: rag_session_id == "rag-1"),
|
|
||||||
task_store=task_store,
|
|
||||||
message_sink=lambda dialog_session_id, role, content, task_id=None: messages.append((dialog_session_id, role, content, task_id)),
|
|
||||||
)
|
|
||||||
|
|
||||||
result = asyncio.run(
|
|
||||||
service.handle_message(
|
|
||||||
ChatMessageRequest(
|
|
||||||
session_id="dialog-1",
|
|
||||||
project_id="rag-1",
|
|
||||||
message="Explain get_user",
|
|
||||||
files=[ChatFileContext(path="app/api/users.py", content="", content_hash="x")],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
task = task_store.get(result.task_id)
|
|
||||||
assert task is not None
|
|
||||||
assert task.answer is not None
|
|
||||||
assert "Недостаточно опоры в коде" in task.answer
|
|
||||||
assert result.status == "done"
|
|
||||||
assert llm.calls == 0
|
|
||||||
assert [item[1] for item in messages] == ["user", "assistant"]
|
|
||||||
Reference in New Issue
Block a user