Compare commits

...

7 Commits

17 changed files with 546 additions and 48 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
src/rag_agent/.env src/rag_agent/.env
.env
docker/ssh
docker/postgres_data

View File

@@ -1,3 +1,6 @@
# RAG Agent (Postgres) # RAG Agent (Postgres)
Custom RAG agent that indexes text files from a git repository into Postgres Custom RAG agent that indexes text files from a git repository into Postgres
@@ -14,7 +17,7 @@ and answers queries using retrieval + LLM generation. **Changes are always in th
2. Configure environment variables: 2. Configure environment variables:
- `RAG_REPO_PATH` — path to git repo with text files - `RAG_REPO_PATH` — path to git repo with text files
- `RAG_DB_DSN` — Postgres DSN (e.g. `postgresql://rag:rag_secret@localhost:5432/rag`) - `RAG_DB_DSN` — Postgres DSN (e.g. `postgresql://rag:rag_secret@localhost:5432/rag`)
- `RAG_EMBEDDINGS_DIM` — embedding vector dimension (e.g. `1536`) - `RAG_EMBEDDINGS_DIM` — embedding vector dimension: **1024** for GigaChat Embeddings (default), 1536 for OpenAI
3. Create DB schema (only if not using Docker, or if init was disabled): 3. Create DB schema (only if not using Docker, or if init was disabled):
- `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`) - `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`)
4. Index files for a story (e.g. branch name as story slug). Use the **full story range** so all commits in the story are included: 4. Index files for a story (e.g. branch name as story slug). Use the **full story range** so all commits in the story are included:
@@ -40,6 +43,35 @@ When the app runs as a service in Docker, it can start a **webhook server** so t
Health check: `GET http://<host>:8000/health``ok`. Port is configurable via `WEBHOOK_PORT` (default 8000) in docker-compose. Health check: `GET http://<host>:8000/health``ok`. Port is configurable via `WEBHOOK_PORT` (default 8000) in docker-compose.
### Webhook diagnostics (202 Accepted but no new rows in DB)
1. **Logs** — After a push, check app logs. Each webhook logs `pull_and_index started branch=… repo_path=…`; then one of:
- `not a git repo or missing``/data` in the container is not a git clone; clone the repo into the mounted dir.
- `git fetch failed` — SSH/network (see `docker/ssh/README.md`) or wrong remote.
- `git checkout … failed` — branch missing in the clone.
- `git merge --ff-only failed` — nonfast-forward (e.g. force-push); index is skipped. Use a normal push or re-clone.
- `no new commits for branch=…` — merge was a no-op (already up to date); nothing to index.
- `running index story=…` then `index completed` — index ran; check tables for that story.
- `index failed` — stderr shows the `rag-agent index` error (e.g. DB, embeddings, repo path).
```bash
docker compose logs -f app
# or: docker logs -f rag-agent
```
Trigger a push and watch for the lines above.
2. **Story and tables** — Rows are per **story** (branch name). Query by story, e.g. `SELECT * FROM stories;` then `SELECT * FROM chunks WHERE story_id = (SELECT id FROM stories WHERE slug = 'main');`.
3. **Manual index** — Run index inside the container to confirm DB and repo work:
```bash
docker compose exec app rag-agent index --story main --changed --base-ref main --head-ref HEAD
```
If this inserts rows, the issue is in the webhook path (fetch/merge/refs).
4. **Allowed extensions** — Only `.md`, `.txt`, `.rst` (or `RAG_ALLOWED_EXTENSIONS`) are indexed; other files are skipped.
5. **"expected 1536 dimensions, not 1024"** — GigaChat Embeddings returns 1024-dim vectors; the default is now 1024. If the DB was created earlier with vector(1536), drop and recreate the tables so the app can create them with 1024: `psql "$RAG_DB_DSN" -c "DROP TABLE IF EXISTS chunks; DROP TABLE IF EXISTS documents;"` then restart the app (ensure_schema will recreate the tables).
## Git hook (index on commit) ## Git hook (index on commit)
Install the post-commit hook so changed files are indexed after each commit: Install the post-commit hook so changed files are indexed after each commit:
@@ -91,7 +123,21 @@ Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env),
If `GIGACHAT_CREDENTIALS` is set (e.g. in `.env` for local runs), embeddings use GigaChat API; otherwise the stub client is used. Optional env: `GIGACHAT_EMBEDDINGS_MODEL` (default `Embeddings`), `GIGACHAT_VERIFY_SSL` (`true`/`false`). Ensure `RAG_EMBEDDINGS_DIM` matches the model output (see GigaChat docs). If `GIGACHAT_CREDENTIALS` is set (e.g. in `.env` for local runs), embeddings use GigaChat API; otherwise the stub client is used. Optional env: `GIGACHAT_EMBEDDINGS_MODEL` (default `Embeddings`), `GIGACHAT_VERIFY_SSL` (`true`/`false`). Ensure `RAG_EMBEDDINGS_DIM` matches the model output (see GigaChat docs).
## Agent (GigaChat)
Ответы на вопросы формирует агент на базе GigaChat: поиск по базе знаний (RAG) + генерация текста. Если задана переменная `GIGACHAT_CREDENTIALS`, используется `GigaChatLLMClient` в `src/rag_agent/agent/pipeline.py`; иначе — заглушка. Модель чата задаётся через `RAG_LLM_MODEL` (по умолчанию `GigaChat`).
## Telegram-бот
Общение с пользователем через бота в Telegram: бот отвечает на текстовые сообщения, используя знания из базы (RAG + GigaChat).
1. Создайте бота через [@BotFather](https://t.me/BotFather) и получите токен.
2. Добавьте в `.env`: `TELEGRAM_BOT_TOKEN=<токен>`.
3. Запуск: `rag-agent bot` (или `python -m rag_agent.telegram_bot`).
4. Через Docker: `docker compose up -d` поднимает БД, вебхук-сервер и бота в отдельных контейнерах; в `.env` должен быть задан `TELEGRAM_BOT_TOKEN`.
Требуются: `RAG_DB_DSN`, `RAG_REPO_PATH`, `GIGACHAT_CREDENTIALS`, `TELEGRAM_BOT_TOKEN`. Расширенное логирование (входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM): `RAG_BOT_VERBOSE_LOGGING=true|false` (по умолчанию `true` для отладки).
## Notes ## Notes
- LLM client is still a stub; replace it in `src/rag_agent/agent/pipeline.py` for real answers.
- This project requires Postgres with the `pgvector` extension. - This project requires Postgres with the `pgvector` extension.

View File

@@ -14,7 +14,8 @@ services:
ports: ports:
- "${POSTGRES_PORT:-5432}:5432" - "${POSTGRES_PORT:-5432}:5432"
volumes: volumes:
- rag_pgdata:/var/lib/postgresql/data # PG 18+: mount at /var/lib/postgresql (data goes in versioned subdir). For pg16 use /var/lib/postgresql/data.
- rag_pgdata:/var/lib/postgresql
# Init scripts run once on first start (create extension, tables). Optional: comment out to skip. # Init scripts run once on first start (create extension, tables). Optional: comment out to skip.
- ./docker/postgres-init:/docker-entrypoint-initdb.d:ro - ./docker/postgres-init:/docker-entrypoint-initdb.d:ro
healthcheck: healthcheck:
@@ -39,18 +40,49 @@ services:
- "${WEBHOOK_PORT:-8000}:8000" - "${WEBHOOK_PORT:-8000}:8000"
environment: environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}" RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
RAG_REPO_PATH: ${RAG_REPO_PATH:-/data} # In container repo is always at /data (mounted below). Use RAG_REPO_HOST in .env for host path.
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1536} RAG_REPO_PATH: "/data"
# Accept host key on first connect; git fetch uses SSH from /root/.ssh (mounted below).
GIT_SSH_COMMAND: "ssh -o StrictHostKeyChecking=accept-new"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-} GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings} GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
WEBHOOK_SECRET: ${WEBHOOK_SECRET:-} WEBHOOK_SECRET: ${WEBHOOK_SECRET:-}
volumes: volumes:
- ${RAG_REPO_PATH:-./data}:/data # Host path: set RAG_REPO_HOST in .env (e.g. /Users/you/repo). Falls back to RAG_REPO_PATH then ./data.
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
# SSH for git fetch (webhook): put deploy key and known_hosts in RAG_SSH_DIR. See docker/ssh/README.md.
- ${RAG_SSH_DIR:-./docker/ssh}:/root/.ssh:ro
entrypoint: ["rag-agent"] entrypoint: ["rag-agent"]
command: ["serve", "--host", "0.0.0.0", "--port", "8000"] command: ["serve", "--host", "0.0.0.0", "--port", "8000"]
networks: networks:
- rag_net - rag_net
bot:
build:
context: .
dockerfile: Dockerfile
image: rag-agent:latest
container_name: rag-bot
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy
environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
RAG_REPO_PATH: "/data"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN:-}
RAG_BOT_VERBOSE_LOGGING: ${RAG_BOT_VERBOSE_LOGGING:-true}
volumes:
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
entrypoint: ["rag-agent"]
command: ["bot"]
networks:
- rag_net
networks: networks:
rag_net: rag_net:
driver: bridge driver: bridge

View File

@@ -1,5 +1,5 @@
-- RAG vector DB schema (runs automatically on first Postgres init). -- RAG vector DB schema (runs automatically on first Postgres init).
-- If RAG_EMBEDDINGS_DIM is not 1536, change vector(1536) below. -- GigaChat Embeddings = 1024; for OpenAI use vector(1536).
CREATE EXTENSION IF NOT EXISTS vector; CREATE EXTENSION IF NOT EXISTS vector;
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS chunks (
chunk_index INTEGER NOT NULL, chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL, hash TEXT NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
embedding vector(1536) NOT NULL, embedding vector(1024) NOT NULL,
start_line INTEGER, start_line INTEGER,
end_line INTEGER, end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added' change_type TEXT NOT NULL DEFAULT 'added'

4
docker/ssh/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
# Deploy key and known_hosts — add locally, do not commit
id_*
id_*.pub
known_hosts

19
docker/ssh/README.md Normal file
View File

@@ -0,0 +1,19 @@
# SSH for webhook (git fetch in container)
When the app runs in Docker and the webhook does `git fetch origin <branch>`, git uses SSH. The container has no keys by default, so you get "Host key verification failed" or "Permission denied".
## Setup
1. **Deploy key** (read-only key for the repo you index):
- Generate: `ssh-keygen -t ed25519 -f docker/ssh/id_ed25519 -N "" -C "rag-agent-deploy"`
- Add the **public** key (`docker/ssh/id_ed25519.pub`) to your Git server (GitHub/GitLab/… → repo → Deploy keys).
2. **Known hosts** (optional; `GIT_SSH_COMMAND` in compose accepts new host keys on first connect):
- To pin the host key: `ssh-keyscan -t ed25519 git.example.com >> docker/ssh/known_hosts`
- Replace `git.example.com` with your Git host (e.g. `github.com`, `git.lesha.spb.ru`).
3. **Permissions**: key file must be readable only by you, e.g. `chmod 600 docker/ssh/id_ed25519`.
4. **Compose**: by default this directory is mounted into the app container as `/root/.ssh`. Override with `RAG_SSH_DIR` in `.env` if you use another path (e.g. `RAG_SSH_DIR=/Users/you/.ssh` to use your main SSH dir).
After that, restart the app: `docker compose up -d`.

View File

@@ -12,6 +12,7 @@ dependencies = [
"gigachat>=0.2.0", "gigachat>=0.2.0",
"fastapi>=0.115.0", "fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0", "uvicorn[standard]>=0.32.0",
"python-telegram-bot>=21.0",
] ]
[project.scripts] [project.scripts]

View File

@@ -1,6 +1,5 @@
-- RAG vector DB schema for Postgres (pgvector). -- RAG vector DB schema for Postgres (pgvector).
-- Run once against an empty DB. If RAG_EMBEDDINGS_DIM is not 1536, change vector(1536) below. -- GigaChat Embeddings = 1024; for OpenAI use vector(1536). Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
-- Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
CREATE EXTENSION IF NOT EXISTS vector; CREATE EXTENSION IF NOT EXISTS vector;
@@ -28,7 +27,7 @@ CREATE TABLE IF NOT EXISTS chunks (
chunk_index INTEGER NOT NULL, chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL, hash TEXT NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
embedding vector(1536) NOT NULL, embedding vector(1024) NOT NULL,
start_line INTEGER, start_line INTEGER,
end_line INTEGER, end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added' change_type TEXT NOT NULL DEFAULT 'added'

View File

@@ -1,14 +1,21 @@
from __future__ import annotations from __future__ import annotations
import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Protocol from typing import Protocol
import psycopg import psycopg
from dotenv import load_dotenv
from rag_agent.config import AppConfig from rag_agent.config import AppConfig
from rag_agent.index.embeddings import EmbeddingClient from rag_agent.index.embeddings import EmbeddingClient
from rag_agent.retrieval.search import search_similar from rag_agent.retrieval.search import search_similar
_repo_root = Path(__file__).resolve().parent.parent.parent
load_dotenv(_repo_root / ".env")
class LLMClient(Protocol): class LLMClient(Protocol):
def generate(self, prompt: str, model: str) -> str: def generate(self, prompt: str, model: str) -> str:
@@ -20,10 +27,49 @@ class StubLLMClient:
def generate(self, prompt: str, model: str) -> str: def generate(self, prompt: str, model: str) -> str:
return ( return (
"LLM client is not configured. " "LLM client is not configured. "
"Replace StubLLMClient with a real implementation." "Set GIGACHAT_CREDENTIALS in .env for GigaChat answers."
) )
class GigaChatLLMClient:
"""LLM generation via GigaChat API. Credentials from env GIGACHAT_CREDENTIALS."""
def __init__(
self,
credentials: str,
model: str = "GigaChat",
verify_ssl_certs: bool = False,
) -> None:
self._credentials = credentials.strip()
self._model = model
self._verify_ssl_certs = verify_ssl_certs
def generate(self, prompt: str, model: str) -> str:
from gigachat import GigaChat
use_model = model or self._model
with GigaChat(
credentials=self._credentials,
model=use_model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
response = giga.chat(prompt)
return (response.choices[0].message.content or "").strip()
def get_llm_client(config: AppConfig) -> LLMClient:
"""Return GigaChat LLM client if credentials set, else stub."""
credentials = os.getenv("GIGACHAT_CREDENTIALS", "").strip()
if credentials:
return GigaChatLLMClient(
credentials=credentials,
model=config.llm_model,
verify_ssl_certs=os.getenv("GIGACHAT_VERIFY_SSL", "false").lower()
in ("1", "true", "yes"),
)
return StubLLMClient()
def build_prompt(question: str, contexts: list[str]) -> str: def build_prompt(question: str, contexts: list[str]) -> str:
joined = "\n\n".join(contexts) joined = "\n\n".join(contexts)
return ( return (
@@ -42,10 +88,32 @@ def answer_query(
top_k: int = 5, top_k: int = 5,
story_id: int | None = None, story_id: int | None = None,
) -> str: ) -> str:
query_embedding = embedding_client.embed_texts([question])[0] answer, _ = answer_query_with_stats(
conn, config, embedding_client, llm_client, question, top_k, story_id
)
return answer
def answer_query_with_stats(
conn: psycopg.Connection,
config: AppConfig,
embedding_client: EmbeddingClient,
llm_client: LLMClient,
question: str,
top_k: int = 5,
story_id: int | None = None,
) -> tuple[str, dict]:
"""Like answer_query but returns (answer, stats) for logging. stats: query_embeddings, chunks_found, answer."""
query_embeddings = embedding_client.embed_texts([question])
results = search_similar( results = search_similar(
conn, query_embedding, top_k=top_k, story_id=story_id conn, query_embeddings[0], top_k=top_k, story_id=story_id
) )
contexts = [f"Source: {item.path}\n{item.content}" for item in results] contexts = [f"Source: {item.path}\n{item.content}" for item in results]
prompt = build_prompt(question, contexts) prompt = build_prompt(question, contexts)
return llm_client.generate(prompt, model=config.llm_model) answer = llm_client.generate(prompt, model=config.llm_model)
stats = {
"query_embeddings": len(query_embeddings),
"chunks_found": len(results),
"answer": answer,
}
return answer, stats

View File

@@ -25,7 +25,7 @@ from rag_agent.index.postgres import (
update_story_indexed_range, update_story_indexed_range,
upsert_document, upsert_document,
) )
from rag_agent.agent.pipeline import StubLLMClient, answer_query from rag_agent.agent.pipeline import answer_query, get_llm_client
def _file_version(path: Path) -> str: def _file_version(path: Path) -> str:
@@ -55,13 +55,24 @@ def cmd_index(args: argparse.Namespace) -> None:
existing = filter_existing(changed_files) existing = filter_existing(changed_files)
else: else:
removed = [] removed = []
existing = [ repo_path = Path(config.repo_path)
p for p in Path(config.repo_path).rglob("*") if p.is_file() if not repo_path.exists():
] raise SystemExit(f"RAG_REPO_PATH does not exist: {config.repo_path}")
existing = [p for p in repo_path.rglob("*") if p.is_file()]
allowed = list(iter_text_files(existing, config.allowed_extensions))
print(
f"repo={config.repo_path} all_files={len(existing)} "
f"allowed={len(allowed)} ext={config.allowed_extensions}"
)
if not allowed:
print("No files to index (check path and extensions .md, .txt, .rst)")
return
for path in removed: for path in removed:
delete_document(conn, story_id, str(path)) delete_document(conn, story_id, str(path))
indexed = 0
for path, text in iter_text_files(existing, config.allowed_extensions): for path, text in iter_text_files(existing, config.allowed_extensions):
chunks = chunk_text_by_lines( chunks = chunk_text_by_lines(
text, text,
@@ -88,11 +99,18 @@ def cmd_index(args: argparse.Namespace) -> None:
replace_chunks( replace_chunks(
conn, document_id, chunks, embeddings, base_chunks=base_chunks conn, document_id, chunks, embeddings, base_chunks=base_chunks
) )
indexed += 1
print(f"Indexed {indexed} documents for story={args.story}")
if args.changed: if args.changed:
update_story_indexed_range(conn, story_id, base_ref, head_ref) update_story_indexed_range(conn, story_id, base_ref, head_ref)
def cmd_bot(args: argparse.Namespace) -> None:
from rag_agent.telegram_bot import main as bot_main
bot_main()
def cmd_serve(args: argparse.Namespace) -> None: def cmd_serve(args: argparse.Namespace) -> None:
import uvicorn import uvicorn
uvicorn.run( uvicorn.run(
@@ -113,7 +131,7 @@ def cmd_ask(args: argparse.Namespace) -> None:
if story_id is None: if story_id is None:
raise SystemExit(f"Story not found: {args.story}") raise SystemExit(f"Story not found: {args.story}")
embedding_client = get_embedding_client(config.embeddings_dim) embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = StubLLMClient() llm_client = get_llm_client(config)
answer = answer_query( answer = answer_query(
conn, conn,
config, config,
@@ -185,6 +203,12 @@ def build_parser() -> argparse.ArgumentParser:
) )
serve_parser.set_defaults(func=cmd_serve) serve_parser.set_defaults(func=cmd_serve)
bot_parser = sub.add_parser(
"bot",
help="Run Telegram bot: answers questions using RAG (requires TELEGRAM_BOT_TOKEN)",
)
bot_parser.set_defaults(func=cmd_bot)
return parser return parser
@@ -196,3 +220,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -59,9 +59,9 @@ def load_config() -> AppConfig:
chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50), chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50),
chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40), chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40),
chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8), chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1536), embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1024), # GigaChat Embeddings = 1024; OpenAI = 1536
embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"), embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"),
llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"), llm_model=os.getenv("RAG_LLM_MODEL", "GigaChat"),
allowed_extensions=tuple( allowed_extensions=tuple(
_env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"]) _env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"])
), ),

View File

@@ -34,6 +34,8 @@ class StubEmbeddingClient:
_GIGACHAT_BATCH_SIZE = 50 _GIGACHAT_BATCH_SIZE = 50
# GigaChat embeddings: max 514 tokens per input; Russian/English ~3 chars/token → truncate to stay under
_GIGACHAT_MAX_CHARS_PER_INPUT = 1200
class GigaChatEmbeddingClient: class GigaChatEmbeddingClient:
@@ -57,16 +59,43 @@ class GigaChatEmbeddingClient:
return [] return []
result: list[list[float]] = [] result: list[list[float]] = []
for i in range(0, len(texts_list), _GIGACHAT_BATCH_SIZE): try:
batch = texts_list[i : i + _GIGACHAT_BATCH_SIZE] for i in range(0, len(texts_list), _GIGACHAT_BATCH_SIZE):
with GigaChat( raw_batch = texts_list[i : i + _GIGACHAT_BATCH_SIZE]
credentials=self._credentials, batch = [
verify_ssl_certs=self._verify_ssl_certs, t[: _GIGACHAT_MAX_CHARS_PER_INPUT] if len(t) > _GIGACHAT_MAX_CHARS_PER_INPUT else t
) as giga: for t in raw_batch
response = giga.embeddings(model=self._model, input=batch) ]
# Preserve order by index with GigaChat(
by_index = {item.index: item.embedding for item in response.data} credentials=self._credentials,
result.extend(by_index[j] for j in range(len(batch))) model=self._model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
# API: embeddings(texts: list[str]) — single positional argument (gigachat 0.2+)
response = giga.embeddings(batch)
# Preserve order by index
by_index = {item.index: item.embedding for item in response.data}
result.extend(by_index[j] for j in range(len(batch)))
except Exception as e:
from gigachat.exceptions import ResponseError, RequestEntityTooLargeError
is_402 = (
isinstance(e, ResponseError)
and (getattr(e, "status_code", None) == 402 or "402" in str(e) or "Payment Required" in str(e))
)
if is_402:
raise ValueError(
"GigaChat: недостаточно средств (402 Payment Required). "
"Пополните баланс в кабинете GigaChat или отключите GIGACHAT_CREDENTIALS для stub-режима."
)
if isinstance(e, RequestEntityTooLargeError) or (
isinstance(e, ResponseError) and getattr(e, "status_code", None) == 413
):
raise ValueError(
"GigaChat: превышен лимит токенов на один запрос (413). "
"Уменьшите RAG_CHUNK_SIZE_LINES или RAG_CHUNK_SIZE в .env (текущий лимит ~514 токенов на чанк)."
)
raise
return result return result

View File

@@ -5,6 +5,7 @@ from datetime import datetime, timezone
from typing import Iterable, Sequence from typing import Iterable, Sequence
import psycopg import psycopg
from pgvector import Vector
from pgvector.psycopg import register_vector from pgvector.psycopg import register_vector
from rag_agent.ingest.chunker import TextChunk from rag_agent.ingest.chunker import TextChunk
@@ -63,6 +64,7 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
try: try:
cur.execute(f"ALTER TABLE stories {col_def};") cur.execute(f"ALTER TABLE stories {col_def};")
except psycopg.ProgrammingError: except psycopg.ProgrammingError:
conn.rollback()
pass pass
cur.execute( cur.execute(
""" """
@@ -103,22 +105,29 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
try: try:
cur.execute(f"ALTER TABLE chunks {col_def};") cur.execute(f"ALTER TABLE chunks {col_def};")
except psycopg.ProgrammingError: except psycopg.ProgrammingError:
conn.rollback()
pass pass
try: try:
cur.execute( cur.execute(
"ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;" "ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;"
) )
except psycopg.ProgrammingError: except psycopg.ProgrammingError:
conn.rollback()
pass pass
try: cur.execute(
cur.execute( """
""" DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'chunks'::regclass AND conname = 'chunks_change_type_check'
) THEN
ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check
CHECK (change_type IN ('added', 'modified', 'unchanged')); CHECK (change_type IN ('added', 'modified', 'unchanged'));
""" END IF;
) END $$;
except psycopg.ProgrammingError: """
pass # constraint may already exist )
cur.execute( cur.execute(
""" """
CREATE INDEX IF NOT EXISTS idx_documents_story_id CREATE INDEX IF NOT EXISTS idx_documents_story_id
@@ -339,6 +348,7 @@ def fetch_similar(
top_k: int, top_k: int,
story_id: int | None = None, story_id: int | None = None,
) -> list[tuple[str, str, float]]: ) -> list[tuple[str, str, float]]:
vec = Vector(query_embedding)
with conn.cursor() as cur: with conn.cursor() as cur:
if story_id is not None: if story_id is not None:
cur.execute( cur.execute(
@@ -350,7 +360,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s ORDER BY c.embedding <=> %s
LIMIT %s; LIMIT %s;
""", """,
(query_embedding, story_id, query_embedding, top_k), (vec, story_id, vec, top_k),
) )
else: else:
cur.execute( cur.execute(
@@ -361,7 +371,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s ORDER BY c.embedding <=> %s
LIMIT %s; LIMIT %s;
""", """,
(query_embedding, query_embedding, top_k), (vec, vec, top_k),
) )
rows = cur.fetchall() rows = cur.fetchall()
return [(row[0], row[1], row[2]) for row in rows] return [(row[0], row[1], row[2]) for row in rows]

View File

@@ -0,0 +1,156 @@
"""Telegram bot: answers user questions using RAG (retrieval + GigaChat)."""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
_repo_root = Path(__file__).resolve().parent.parent
load_dotenv(_repo_root / ".env")
logger = logging.getLogger(__name__)
# Расширенное логирование: входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM.
# Включить/выключить: RAG_BOT_VERBOSE_LOGGING=true|false (по умолчанию true для отладки).
VERBOSE_LOGGING_MAX_ANSWER_CHARS = 500
def _verbose_logging_enabled() -> bool:
return os.getenv("RAG_BOT_VERBOSE_LOGGING", "true").lower() in ("1", "true", "yes")
def _run_rag(
question: str,
top_k: int = 5,
story_id: int | None = None,
with_stats: bool = False,
) -> str | tuple[str, dict]:
"""Synchronous RAG call: retrieval + LLM. Used from thread.
If with_stats=True, returns (answer, stats); else returns answer only.
"""
from rag_agent.config import load_config
from rag_agent.index.embeddings import get_embedding_client
from rag_agent.index.postgres import connect, ensure_schema
from rag_agent.agent.pipeline import answer_query, answer_query_with_stats, get_llm_client
config = load_config()
conn = connect(config.db_dsn)
try:
ensure_schema(conn, config.embeddings_dim)
embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = get_llm_client(config)
if with_stats:
return answer_query_with_stats(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
return answer_query(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
finally:
conn.close()
def run_bot() -> None:
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
if not token:
logger.error(
"TELEGRAM_BOT_TOKEN is required. Set it in .env or environment. "
"Container will stay up; restart after setting the token."
)
import time
while True:
time.sleep(3600)
from telegram import Update
from telegram.ext import Application, ContextTypes, MessageHandler, filters
verbose = _verbose_logging_enabled()
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text:
return
question = update.message.text.strip()
if not question:
await update.message.reply_text("Напишите вопрос текстом.")
return
user_id = update.effective_user.id if update.effective_user else None
chat_id = update.effective_chat.id if update.effective_chat else None
if verbose:
logger.info(
"received message user_id=%s chat_id=%s text=%s",
user_id,
chat_id,
repr(question[:200] + ("" if len(question) > 200 else "")),
)
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: _run_rag(
question,
top_k=5,
story_id=None,
with_stats=verbose,
),
)
if verbose:
answer, stats = result
logger.info(
"query_embeddings=%s chunks_found=%s",
stats["query_embeddings"],
stats["chunks_found"],
)
answer_preview = stats["answer"]
if len(answer_preview) > VERBOSE_LOGGING_MAX_ANSWER_CHARS:
answer_preview = (
answer_preview[:VERBOSE_LOGGING_MAX_ANSWER_CHARS] + ""
)
logger.info("llm_response=%s", repr(answer_preview))
else:
answer = result
if len(answer) > 4096:
answer = answer[:4090] + "\n"
await update.message.reply_text(answer)
except Exception as e:
logger.exception("RAG error")
await update.message.reply_text(
f"Не удалось получить ответ: {e!s}. "
"Проверьте RAG_DB_DSN и GIGACHAT_CREDENTIALS."
)
app = Application.builder().token(token).build()
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Telegram bot started (polling)")
app.run_polling(drop_pending_updates=True)
def main() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
# Убрать из лога пустые HTTP-ответы polling (без сообщений от пользователя)
for name in ("telegram", "httpx", "httpcore"):
logging.getLogger(name).setLevel(logging.WARNING)
try:
run_bot()
except KeyboardInterrupt:
pass
except ValueError as e:
raise SystemExit(e) from e

View File

@@ -16,6 +16,14 @@ from fastapi.responses import PlainTextResponse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# So background webhook logs (pull_and_index, index) appear when run via uvicorn
_rag_log = logging.getLogger("rag_agent")
if not _rag_log.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s: %(name)s: %(message)s"))
_rag_log.setLevel(logging.INFO)
_rag_log.addHandler(_h)
app = FastAPI(title="RAG Agent Webhook", version="0.1.0") app = FastAPI(title="RAG Agent Webhook", version="0.1.0")
@@ -26,6 +34,14 @@ def _branch_from_ref(ref: str) -> str | None:
return ref.removeprefix("refs/heads/") return ref.removeprefix("refs/heads/")
# GitHub/GitLab send null SHA as "before" when a branch is first created.
_NULL_SHA = "0000000000000000000000000000000000000000"
def _is_null_sha(sha: str | None) -> bool:
return sha is not None and sha == _NULL_SHA
def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool: def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool:
if not secret or not signature_header or not signature_header.startswith("sha256="): if not secret or not signature_header or not signature_header.startswith("sha256="):
return not secret return not secret
@@ -36,6 +52,12 @@ def _verify_github_signature(body: bytes, secret: str, signature_header: str | N
return hmac.compare_digest(received, expected) return hmac.compare_digest(received, expected)
def _decode_stderr(stderr: str | bytes | None) -> str:
if stderr is None:
return ""
return stderr.decode("utf-8", errors="replace") if isinstance(stderr, bytes) else stderr
def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool: def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool:
env = os.environ.copy() env = os.environ.copy()
env["RAG_REPO_PATH"] = repo_path env["RAG_REPO_PATH"] = repo_path
@@ -48,7 +70,10 @@ def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool
timeout=600, timeout=600,
) )
if proc.returncode != 0: if proc.returncode != 0:
logger.error("index failed: %s %s", proc.stdout, proc.stderr) logger.error(
"index failed (story=%s base=%s head=%s): stdout=%s stderr=%s",
story, base_ref, head_ref, proc.stdout, proc.stderr,
)
return False return False
logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref) logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref)
return True return True
@@ -60,7 +85,18 @@ def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool
return False return False
def _pull_and_index(repo_path: str, branch: str) -> None: def _pull_and_index(
repo_path: str,
branch: str,
*,
payload_before: str | None = None,
payload_after: str | None = None,
) -> None:
"""Fetch, checkout branch; index range from payload (before→after) or from merge result."""
logger.info(
"webhook: pull_and_index started branch=%s repo_path=%s payload_before=%s payload_after=%s",
branch, repo_path, payload_before, payload_after,
)
repo = Path(repo_path) repo = Path(repo_path)
if not repo.is_dir() or not (repo / ".git").exists(): if not repo.is_dir() or not (repo / ".git").exists():
logger.warning("not a git repo or missing: %s", repo_path) logger.warning("not a git repo or missing: %s", repo_path)
@@ -73,7 +109,7 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
timeout=60, timeout=60,
) )
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logger.warning("git fetch failed: %s", e.stderr) logger.warning("git fetch failed (branch=%s): %s", branch, _decode_stderr(e.stderr))
return return
except Exception as e: except Exception as e:
logger.exception("git fetch error: %s", e) logger.exception("git fetch error: %s", e)
@@ -87,7 +123,61 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
timeout=10, timeout=10,
) )
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logger.warning("git checkout %s failed: %s", branch, e.stderr) logger.warning("git checkout %s failed: %s", branch, _decode_stderr(e.stderr))
return
# Branch deletion: after is null SHA → nothing to index
if _is_null_sha(payload_after):
logger.info("webhook: branch deletion detected (after is null SHA) for branch=%s; skipping index", branch)
return
# Prefer commit range from webhook payload (GitHub/GitLab before/after) so we index every push
# even when the clone is the same dir as the one that was pushed from (HEAD already at new commit).
if payload_before and payload_after and payload_before != payload_after:
# Update working tree to new commits (fetch only fetches refs; index reads files from disk)
origin_ref = f"origin/{branch}"
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True,
text=True,
timeout=60,
)
if merge_proc.returncode != 0:
logger.warning(
"webhook: git merge --ff-only failed (branch=%s); stderr=%s",
branch, _decode_stderr(merge_proc.stderr),
)
return
# New branch: before is null SHA → use auto (merge-base with default branch)
if _is_null_sha(payload_before):
logger.info(
"webhook: new branch detected (before is null SHA), using --base-ref auto story=%s head=%s",
branch, payload_after,
)
_run_index(repo_path, story=branch, base_ref="auto", head_ref=payload_after)
return
logger.info(
"webhook: running index from payload story=%s %s..%s",
branch, payload_before, payload_after,
)
_run_index(repo_path, story=branch, base_ref=payload_before, head_ref=payload_after)
return
if payload_before and payload_after and payload_before == payload_after:
logger.info("webhook: payload before==after for branch=%s (e.g. force-push); skipping index", branch)
return
# Fallback: no before/after in payload — infer from merge (original behaviour).
origin_ref = f"origin/{branch}"
rev_origin = subprocess.run(
["git", "-C", repo_path, "rev-parse", origin_ref],
capture_output=True,
text=True,
timeout=10,
)
origin_head = (rev_origin.stdout or "").strip() if rev_origin.returncode == 0 else None
if not origin_head:
logger.warning("after fetch: %s not found (wrong branch name?)", origin_ref)
return return
try: try:
@@ -102,9 +192,16 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
logger.exception("rev-parse HEAD: %s", e) logger.exception("rev-parse HEAD: %s", e)
return return
if old_head == origin_head:
logger.info(
"no new commits for branch=%s (already at %s); skipping index",
branch, origin_head,
)
return
try: try:
merge_proc = subprocess.run( merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", f"origin/{branch}"], ["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=60, timeout=60,
@@ -113,7 +210,10 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
logger.error("git merge timeout") logger.error("git merge timeout")
return return
if merge_proc.returncode != 0: if merge_proc.returncode != 0:
logger.warning("git merge --ff-only failed (non-fast-forward?). Skipping index.") logger.warning(
"git merge --ff-only failed (branch=%s, non-fast-forward?). stderr=%s Skipping index.",
branch, _decode_stderr(merge_proc.stderr),
)
return return
new_head = subprocess.run( new_head = subprocess.run(
@@ -124,9 +224,10 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
) )
new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None
if not old_head or not new_head or old_head == new_head: if not old_head or not new_head or old_head == new_head:
logger.info("no new commits for branch=%s", branch) logger.info("no new commits for branch=%s (old_head=%s new_head=%s)", branch, old_head, new_head)
return return
logger.info("webhook: running index story=%s %s..%s", branch, old_head, new_head)
_run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head) _run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head)
@@ -152,6 +253,10 @@ async def webhook(request: Request) -> Response:
if not branch: if not branch:
return PlainTextResponse("Missing or unsupported ref", status_code=400) return PlainTextResponse("Missing or unsupported ref", status_code=400)
# GitHub/GitLab push: before = previous commit, after = new tip (use for index range)
before = (payload.get("before") or "").strip() or None
after = (payload.get("after") or "").strip() or None
repo_path = os.getenv("RAG_REPO_PATH", "").strip() repo_path = os.getenv("RAG_REPO_PATH", "").strip()
if not repo_path: if not repo_path:
return PlainTextResponse("RAG_REPO_PATH not set", status_code=500) return PlainTextResponse("RAG_REPO_PATH not set", status_code=500)
@@ -159,6 +264,7 @@ async def webhook(request: Request) -> Response:
threading.Thread( threading.Thread(
target=_pull_and_index, target=_pull_and_index,
args=(repo_path, branch), args=(repo_path, branch),
kwargs={"payload_before": before, "payload_after": after},
daemon=True, daemon=True,
).start() ).start()