diff --git a/README.md b/README.md index 65b216e..19a794a 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,21 @@ Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env), If `GIGACHAT_CREDENTIALS` is set (e.g. in `.env` for local runs), embeddings use GigaChat API; otherwise the stub client is used. Optional env: `GIGACHAT_EMBEDDINGS_MODEL` (default `Embeddings`), `GIGACHAT_VERIFY_SSL` (`true`/`false`). Ensure `RAG_EMBEDDINGS_DIM` matches the model output (see GigaChat docs). +## Agent (GigaChat) + +Ответы на вопросы формирует агент на базе GigaChat: поиск по базе знаний (RAG) + генерация текста. Если задана переменная `GIGACHAT_CREDENTIALS`, используется `GigaChatLLMClient` в `src/rag_agent/agent/pipeline.py`; иначе — заглушка. Модель чата задаётся через `RAG_LLM_MODEL` (по умолчанию `GigaChat`). + +## Telegram-бот + +Общение с пользователем через бота в Telegram: бот отвечает на текстовые сообщения, используя знания из базы (RAG + GigaChat). + +1. Создайте бота через [@BotFather](https://t.me/BotFather) и получите токен. +2. Добавьте в `.env`: `TELEGRAM_BOT_TOKEN=<токен>`. +3. Запуск: `rag-agent bot` (или `python -m rag_agent.telegram_bot`). +4. Через Docker: `docker compose up -d` поднимает БД, вебхук-сервер и бота в отдельных контейнерах; в `.env` должен быть задан `TELEGRAM_BOT_TOKEN`. + +Требуются: `RAG_DB_DSN`, `RAG_REPO_PATH`, `GIGACHAT_CREDENTIALS`, `TELEGRAM_BOT_TOKEN`. Расширенное логирование (входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM): `RAG_BOT_VERBOSE_LOGGING=true|false` (по умолчанию `true` для отладки). + ## Notes -- LLM client is still a stub; replace it in `src/rag_agent/agent/pipeline.py` for real answers. - This project requires Postgres with the `pgvector` extension. diff --git a/docker-compose.yml b/docker-compose.yml index 91ab669..d8e2336 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -58,6 +58,31 @@ services: networks: - rag_net + bot: + build: + context: . + dockerfile: Dockerfile + image: rag-agent:latest + container_name: rag-bot + restart: unless-stopped + depends_on: + postgres: + condition: service_healthy + environment: + RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}" + RAG_REPO_PATH: "/data" + RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024} + GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-} + GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings} + TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN:-} + RAG_BOT_VERBOSE_LOGGING: ${RAG_BOT_VERBOSE_LOGGING:-true} + volumes: + - ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data + entrypoint: ["rag-agent"] + command: ["bot"] + networks: + - rag_net + networks: rag_net: driver: bridge diff --git a/pyproject.toml b/pyproject.toml index 9e466d7..21aa367 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "gigachat>=0.2.0", "fastapi>=0.115.0", "uvicorn[standard]>=0.32.0", + "python-telegram-bot>=21.0", ] [project.scripts] diff --git a/src/rag_agent/__pycache__/cli.cpython-312.pyc b/src/rag_agent/__pycache__/cli.cpython-312.pyc index be3e16d..8e0f885 100644 Binary files a/src/rag_agent/__pycache__/cli.cpython-312.pyc and b/src/rag_agent/__pycache__/cli.cpython-312.pyc differ diff --git a/src/rag_agent/__pycache__/config.cpython-312.pyc b/src/rag_agent/__pycache__/config.cpython-312.pyc index 4b5a21b..188d5a6 100644 Binary files a/src/rag_agent/__pycache__/config.cpython-312.pyc and b/src/rag_agent/__pycache__/config.cpython-312.pyc differ diff --git a/src/rag_agent/agent/pipeline.py b/src/rag_agent/agent/pipeline.py index 8a890c7..8a15469 100644 --- a/src/rag_agent/agent/pipeline.py +++ b/src/rag_agent/agent/pipeline.py @@ -1,14 +1,21 @@ from __future__ import annotations +import os from dataclasses import dataclass +from pathlib import Path from typing import Protocol import psycopg +from dotenv import load_dotenv + from rag_agent.config import AppConfig from rag_agent.index.embeddings import EmbeddingClient from rag_agent.retrieval.search import search_similar +_repo_root = Path(__file__).resolve().parent.parent.parent +load_dotenv(_repo_root / ".env") + class LLMClient(Protocol): def generate(self, prompt: str, model: str) -> str: @@ -20,10 +27,49 @@ class StubLLMClient: def generate(self, prompt: str, model: str) -> str: return ( "LLM client is not configured. " - "Replace StubLLMClient with a real implementation." + "Set GIGACHAT_CREDENTIALS in .env for GigaChat answers." ) +class GigaChatLLMClient: + """LLM generation via GigaChat API. Credentials from env GIGACHAT_CREDENTIALS.""" + + def __init__( + self, + credentials: str, + model: str = "GigaChat", + verify_ssl_certs: bool = False, + ) -> None: + self._credentials = credentials.strip() + self._model = model + self._verify_ssl_certs = verify_ssl_certs + + def generate(self, prompt: str, model: str) -> str: + from gigachat import GigaChat + + use_model = model or self._model + with GigaChat( + credentials=self._credentials, + model=use_model, + verify_ssl_certs=self._verify_ssl_certs, + ) as giga: + response = giga.chat(prompt) + return (response.choices[0].message.content or "").strip() + + +def get_llm_client(config: AppConfig) -> LLMClient: + """Return GigaChat LLM client if credentials set, else stub.""" + credentials = os.getenv("GIGACHAT_CREDENTIALS", "").strip() + if credentials: + return GigaChatLLMClient( + credentials=credentials, + model=config.llm_model, + verify_ssl_certs=os.getenv("GIGACHAT_VERIFY_SSL", "false").lower() + in ("1", "true", "yes"), + ) + return StubLLMClient() + + def build_prompt(question: str, contexts: list[str]) -> str: joined = "\n\n".join(contexts) return ( @@ -42,10 +88,32 @@ def answer_query( top_k: int = 5, story_id: int | None = None, ) -> str: - query_embedding = embedding_client.embed_texts([question])[0] + answer, _ = answer_query_with_stats( + conn, config, embedding_client, llm_client, question, top_k, story_id + ) + return answer + + +def answer_query_with_stats( + conn: psycopg.Connection, + config: AppConfig, + embedding_client: EmbeddingClient, + llm_client: LLMClient, + question: str, + top_k: int = 5, + story_id: int | None = None, +) -> tuple[str, dict]: + """Like answer_query but returns (answer, stats) for logging. stats: query_embeddings, chunks_found, answer.""" + query_embeddings = embedding_client.embed_texts([question]) results = search_similar( - conn, query_embedding, top_k=top_k, story_id=story_id + conn, query_embeddings[0], top_k=top_k, story_id=story_id ) contexts = [f"Source: {item.path}\n{item.content}" for item in results] prompt = build_prompt(question, contexts) - return llm_client.generate(prompt, model=config.llm_model) + answer = llm_client.generate(prompt, model=config.llm_model) + stats = { + "query_embeddings": len(query_embeddings), + "chunks_found": len(results), + "answer": answer, + } + return answer, stats diff --git a/src/rag_agent/cli.py b/src/rag_agent/cli.py index 3c8af6b..0a3f718 100644 --- a/src/rag_agent/cli.py +++ b/src/rag_agent/cli.py @@ -25,7 +25,7 @@ from rag_agent.index.postgres import ( update_story_indexed_range, upsert_document, ) -from rag_agent.agent.pipeline import StubLLMClient, answer_query +from rag_agent.agent.pipeline import answer_query, get_llm_client def _file_version(path: Path) -> str: @@ -55,13 +55,24 @@ def cmd_index(args: argparse.Namespace) -> None: existing = filter_existing(changed_files) else: removed = [] - existing = [ - p for p in Path(config.repo_path).rglob("*") if p.is_file() - ] + repo_path = Path(config.repo_path) + if not repo_path.exists(): + raise SystemExit(f"RAG_REPO_PATH does not exist: {config.repo_path}") + existing = [p for p in repo_path.rglob("*") if p.is_file()] + + allowed = list(iter_text_files(existing, config.allowed_extensions)) + print( + f"repo={config.repo_path} all_files={len(existing)} " + f"allowed={len(allowed)} ext={config.allowed_extensions}" + ) + if not allowed: + print("No files to index (check path and extensions .md, .txt, .rst)") + return for path in removed: delete_document(conn, story_id, str(path)) + indexed = 0 for path, text in iter_text_files(existing, config.allowed_extensions): chunks = chunk_text_by_lines( text, @@ -88,11 +99,18 @@ def cmd_index(args: argparse.Namespace) -> None: replace_chunks( conn, document_id, chunks, embeddings, base_chunks=base_chunks ) + indexed += 1 + print(f"Indexed {indexed} documents for story={args.story}") if args.changed: update_story_indexed_range(conn, story_id, base_ref, head_ref) +def cmd_bot(args: argparse.Namespace) -> None: + from rag_agent.telegram_bot import main as bot_main + bot_main() + + def cmd_serve(args: argparse.Namespace) -> None: import uvicorn uvicorn.run( @@ -113,7 +131,7 @@ def cmd_ask(args: argparse.Namespace) -> None: if story_id is None: raise SystemExit(f"Story not found: {args.story}") embedding_client = get_embedding_client(config.embeddings_dim) - llm_client = StubLLMClient() + llm_client = get_llm_client(config) answer = answer_query( conn, config, @@ -185,6 +203,12 @@ def build_parser() -> argparse.ArgumentParser: ) serve_parser.set_defaults(func=cmd_serve) + bot_parser = sub.add_parser( + "bot", + help="Run Telegram bot: answers questions using RAG (requires TELEGRAM_BOT_TOKEN)", + ) + bot_parser.set_defaults(func=cmd_bot) + return parser diff --git a/src/rag_agent/config.py b/src/rag_agent/config.py index e55f7e9..7034efa 100644 --- a/src/rag_agent/config.py +++ b/src/rag_agent/config.py @@ -61,7 +61,7 @@ def load_config() -> AppConfig: chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8), embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1024), # GigaChat Embeddings = 1024; OpenAI = 1536 embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"), - llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"), + llm_model=os.getenv("RAG_LLM_MODEL", "GigaChat"), allowed_extensions=tuple( _env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"]) ), diff --git a/src/rag_agent/index/postgres.py b/src/rag_agent/index/postgres.py index 596b9f3..b52d901 100644 --- a/src/rag_agent/index/postgres.py +++ b/src/rag_agent/index/postgres.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone from typing import Iterable, Sequence import psycopg +from pgvector import Vector from pgvector.psycopg import register_vector from rag_agent.ingest.chunker import TextChunk @@ -113,16 +114,20 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None: except psycopg.ProgrammingError: conn.rollback() pass - try: - cur.execute( - """ + cur.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = 'chunks'::regclass AND conname = 'chunks_change_type_check' + ) THEN ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check CHECK (change_type IN ('added', 'modified', 'unchanged')); - """ - ) - except psycopg.ProgrammingError: - conn.rollback() - pass # constraint may already exist + END IF; + END $$; + """ + ) cur.execute( """ CREATE INDEX IF NOT EXISTS idx_documents_story_id @@ -343,6 +348,7 @@ def fetch_similar( top_k: int, story_id: int | None = None, ) -> list[tuple[str, str, float]]: + vec = Vector(query_embedding) with conn.cursor() as cur: if story_id is not None: cur.execute( @@ -354,7 +360,7 @@ def fetch_similar( ORDER BY c.embedding <=> %s LIMIT %s; """, - (query_embedding, story_id, query_embedding, top_k), + (vec, story_id, vec, top_k), ) else: cur.execute( @@ -365,7 +371,7 @@ def fetch_similar( ORDER BY c.embedding <=> %s LIMIT %s; """, - (query_embedding, query_embedding, top_k), + (vec, vec, top_k), ) rows = cur.fetchall() return [(row[0], row[1], row[2]) for row in rows] diff --git a/src/rag_agent/telegram_bot.py b/src/rag_agent/telegram_bot.py new file mode 100644 index 0000000..6bc1ff2 --- /dev/null +++ b/src/rag_agent/telegram_bot.py @@ -0,0 +1,156 @@ +"""Telegram bot: answers user questions using RAG (retrieval + GigaChat).""" + +from __future__ import annotations + +import asyncio +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv + +_repo_root = Path(__file__).resolve().parent.parent +load_dotenv(_repo_root / ".env") + +logger = logging.getLogger(__name__) + +# Расширенное логирование: входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM. +# Включить/выключить: RAG_BOT_VERBOSE_LOGGING=true|false (по умолчанию true для отладки). +VERBOSE_LOGGING_MAX_ANSWER_CHARS = 500 + + +def _verbose_logging_enabled() -> bool: + return os.getenv("RAG_BOT_VERBOSE_LOGGING", "true").lower() in ("1", "true", "yes") + + +def _run_rag( + question: str, + top_k: int = 5, + story_id: int | None = None, + with_stats: bool = False, +) -> str | tuple[str, dict]: + """Synchronous RAG call: retrieval + LLM. Used from thread. + If with_stats=True, returns (answer, stats); else returns answer only. + """ + from rag_agent.config import load_config + from rag_agent.index.embeddings import get_embedding_client + from rag_agent.index.postgres import connect, ensure_schema + from rag_agent.agent.pipeline import answer_query, answer_query_with_stats, get_llm_client + + config = load_config() + conn = connect(config.db_dsn) + try: + ensure_schema(conn, config.embeddings_dim) + embedding_client = get_embedding_client(config.embeddings_dim) + llm_client = get_llm_client(config) + if with_stats: + return answer_query_with_stats( + conn, + config, + embedding_client, + llm_client, + question, + top_k=top_k, + story_id=story_id, + ) + return answer_query( + conn, + config, + embedding_client, + llm_client, + question, + top_k=top_k, + story_id=story_id, + ) + finally: + conn.close() + + +def run_bot() -> None: + token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip() + if not token: + logger.error( + "TELEGRAM_BOT_TOKEN is required. Set it in .env or environment. " + "Container will stay up; restart after setting the token." + ) + import time + while True: + time.sleep(3600) + + from telegram import Update + from telegram.ext import Application, ContextTypes, MessageHandler, filters + + verbose = _verbose_logging_enabled() + + async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message or not update.message.text: + return + question = update.message.text.strip() + if not question: + await update.message.reply_text("Напишите вопрос текстом.") + return + user_id = update.effective_user.id if update.effective_user else None + chat_id = update.effective_chat.id if update.effective_chat else None + if verbose: + logger.info( + "received message user_id=%s chat_id=%s text=%s", + user_id, + chat_id, + repr(question[:200] + ("…" if len(question) > 200 else "")), + ) + try: + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: _run_rag( + question, + top_k=5, + story_id=None, + with_stats=verbose, + ), + ) + if verbose: + answer, stats = result + logger.info( + "query_embeddings=%s chunks_found=%s", + stats["query_embeddings"], + stats["chunks_found"], + ) + answer_preview = stats["answer"] + if len(answer_preview) > VERBOSE_LOGGING_MAX_ANSWER_CHARS: + answer_preview = ( + answer_preview[:VERBOSE_LOGGING_MAX_ANSWER_CHARS] + "…" + ) + logger.info("llm_response=%s", repr(answer_preview)) + else: + answer = result + if len(answer) > 4096: + answer = answer[:4090] + "\n…" + await update.message.reply_text(answer) + except Exception as e: + logger.exception("RAG error") + await update.message.reply_text( + f"Не удалось получить ответ: {e!s}. " + "Проверьте RAG_DB_DSN и GIGACHAT_CREDENTIALS." + ) + + app = Application.builder().token(token).build() + app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + logger.info("Telegram bot started (polling)") + app.run_polling(drop_pending_updates=True) + + +def main() -> None: + logging.basicConfig( + format="%(asctime)s %(levelname)s %(name)s %(message)s", + level=logging.INFO, + ) + # Убрать из лога пустые HTTP-ответы polling (без сообщений от пользователя) + for name in ("telegram", "httpx", "httpcore"): + logging.getLogger(name).setLevel(logging.WARNING) + try: + run_bot() + except KeyboardInterrupt: + pass + except ValueError as e: + raise SystemExit(e) from e diff --git a/src/rag_agent/webhook.py b/src/rag_agent/webhook.py index 6a35fc1..8166be9 100644 --- a/src/rag_agent/webhook.py +++ b/src/rag_agent/webhook.py @@ -34,6 +34,14 @@ def _branch_from_ref(ref: str) -> str | None: return ref.removeprefix("refs/heads/") +# GitHub/GitLab send null SHA as "before" when a branch is first created. +_NULL_SHA = "0000000000000000000000000000000000000000" + + +def _is_null_sha(sha: str | None) -> bool: + return sha is not None and sha == _NULL_SHA + + def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool: if not secret or not signature_header or not signature_header.startswith("sha256="): return not secret @@ -118,9 +126,36 @@ def _pull_and_index( logger.warning("git checkout %s failed: %s", branch, _decode_stderr(e.stderr)) return + # Branch deletion: after is null SHA → nothing to index + if _is_null_sha(payload_after): + logger.info("webhook: branch deletion detected (after is null SHA) for branch=%s; skipping index", branch) + return + # Prefer commit range from webhook payload (GitHub/GitLab before/after) so we index every push # even when the clone is the same dir as the one that was pushed from (HEAD already at new commit). if payload_before and payload_after and payload_before != payload_after: + # Update working tree to new commits (fetch only fetches refs; index reads files from disk) + origin_ref = f"origin/{branch}" + merge_proc = subprocess.run( + ["git", "-C", repo_path, "merge", "--ff-only", origin_ref], + capture_output=True, + text=True, + timeout=60, + ) + if merge_proc.returncode != 0: + logger.warning( + "webhook: git merge --ff-only failed (branch=%s); stderr=%s", + branch, _decode_stderr(merge_proc.stderr), + ) + return + # New branch: before is null SHA → use auto (merge-base with default branch) + if _is_null_sha(payload_before): + logger.info( + "webhook: new branch detected (before is null SHA), using --base-ref auto story=%s head=%s", + branch, payload_after, + ) + _run_index(repo_path, story=branch, base_ref="auto", head_ref=payload_after) + return logger.info( "webhook: running index from payload story=%s %s..%s", branch, payload_before, payload_after,