Compare commits

..

7 Commits

17 changed files with 546 additions and 48 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
src/rag_agent/.env
.env
docker/ssh
docker/postgres_data

View File

@@ -1,3 +1,6 @@
# RAG Agent (Postgres)
Custom RAG agent that indexes text files from a git repository into Postgres
@@ -14,7 +17,7 @@ and answers queries using retrieval + LLM generation. **Changes are always in th
2. Configure environment variables:
- `RAG_REPO_PATH` — path to git repo with text files
- `RAG_DB_DSN` — Postgres DSN (e.g. `postgresql://rag:rag_secret@localhost:5432/rag`)
- `RAG_EMBEDDINGS_DIM` — embedding vector dimension (e.g. `1536`)
- `RAG_EMBEDDINGS_DIM` — embedding vector dimension: **1024** for GigaChat Embeddings (default), 1536 for OpenAI
3. Create DB schema (only if not using Docker, or if init was disabled):
- `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`)
4. Index files for a story (e.g. branch name as story slug). Use the **full story range** so all commits in the story are included:
@@ -40,6 +43,35 @@ When the app runs as a service in Docker, it can start a **webhook server** so t
Health check: `GET http://<host>:8000/health``ok`. Port is configurable via `WEBHOOK_PORT` (default 8000) in docker-compose.
### Webhook diagnostics (202 Accepted but no new rows in DB)
1. **Logs** — After a push, check app logs. Each webhook logs `pull_and_index started branch=… repo_path=…`; then one of:
- `not a git repo or missing``/data` in the container is not a git clone; clone the repo into the mounted dir.
- `git fetch failed` — SSH/network (see `docker/ssh/README.md`) or wrong remote.
- `git checkout … failed` — branch missing in the clone.
- `git merge --ff-only failed` — nonfast-forward (e.g. force-push); index is skipped. Use a normal push or re-clone.
- `no new commits for branch=…` — merge was a no-op (already up to date); nothing to index.
- `running index story=…` then `index completed` — index ran; check tables for that story.
- `index failed` — stderr shows the `rag-agent index` error (e.g. DB, embeddings, repo path).
```bash
docker compose logs -f app
# or: docker logs -f rag-agent
```
Trigger a push and watch for the lines above.
2. **Story and tables** — Rows are per **story** (branch name). Query by story, e.g. `SELECT * FROM stories;` then `SELECT * FROM chunks WHERE story_id = (SELECT id FROM stories WHERE slug = 'main');`.
3. **Manual index** — Run index inside the container to confirm DB and repo work:
```bash
docker compose exec app rag-agent index --story main --changed --base-ref main --head-ref HEAD
```
If this inserts rows, the issue is in the webhook path (fetch/merge/refs).
4. **Allowed extensions** — Only `.md`, `.txt`, `.rst` (or `RAG_ALLOWED_EXTENSIONS`) are indexed; other files are skipped.
5. **"expected 1536 dimensions, not 1024"** — GigaChat Embeddings returns 1024-dim vectors; the default is now 1024. If the DB was created earlier with vector(1536), drop and recreate the tables so the app can create them with 1024: `psql "$RAG_DB_DSN" -c "DROP TABLE IF EXISTS chunks; DROP TABLE IF EXISTS documents;"` then restart the app (ensure_schema will recreate the tables).
## Git hook (index on commit)
Install the post-commit hook so changed files are indexed after each commit:
@@ -91,7 +123,21 @@ Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env),
If `GIGACHAT_CREDENTIALS` is set (e.g. in `.env` for local runs), embeddings use GigaChat API; otherwise the stub client is used. Optional env: `GIGACHAT_EMBEDDINGS_MODEL` (default `Embeddings`), `GIGACHAT_VERIFY_SSL` (`true`/`false`). Ensure `RAG_EMBEDDINGS_DIM` matches the model output (see GigaChat docs).
## Agent (GigaChat)
Ответы на вопросы формирует агент на базе GigaChat: поиск по базе знаний (RAG) + генерация текста. Если задана переменная `GIGACHAT_CREDENTIALS`, используется `GigaChatLLMClient` в `src/rag_agent/agent/pipeline.py`; иначе — заглушка. Модель чата задаётся через `RAG_LLM_MODEL` (по умолчанию `GigaChat`).
## Telegram-бот
Общение с пользователем через бота в Telegram: бот отвечает на текстовые сообщения, используя знания из базы (RAG + GigaChat).
1. Создайте бота через [@BotFather](https://t.me/BotFather) и получите токен.
2. Добавьте в `.env`: `TELEGRAM_BOT_TOKEN=<токен>`.
3. Запуск: `rag-agent bot` (или `python -m rag_agent.telegram_bot`).
4. Через Docker: `docker compose up -d` поднимает БД, вебхук-сервер и бота в отдельных контейнерах; в `.env` должен быть задан `TELEGRAM_BOT_TOKEN`.
Требуются: `RAG_DB_DSN`, `RAG_REPO_PATH`, `GIGACHAT_CREDENTIALS`, `TELEGRAM_BOT_TOKEN`. Расширенное логирование (входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM): `RAG_BOT_VERBOSE_LOGGING=true|false` (по умолчанию `true` для отладки).
## Notes
- LLM client is still a stub; replace it in `src/rag_agent/agent/pipeline.py` for real answers.
- This project requires Postgres with the `pgvector` extension.

View File

@@ -14,7 +14,8 @@ services:
ports:
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- rag_pgdata:/var/lib/postgresql/data
# PG 18+: mount at /var/lib/postgresql (data goes in versioned subdir). For pg16 use /var/lib/postgresql/data.
- rag_pgdata:/var/lib/postgresql
# Init scripts run once on first start (create extension, tables). Optional: comment out to skip.
- ./docker/postgres-init:/docker-entrypoint-initdb.d:ro
healthcheck:
@@ -39,18 +40,49 @@ services:
- "${WEBHOOK_PORT:-8000}:8000"
environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
RAG_REPO_PATH: ${RAG_REPO_PATH:-/data}
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1536}
# In container repo is always at /data (mounted below). Use RAG_REPO_HOST in .env for host path.
RAG_REPO_PATH: "/data"
# Accept host key on first connect; git fetch uses SSH from /root/.ssh (mounted below).
GIT_SSH_COMMAND: "ssh -o StrictHostKeyChecking=accept-new"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
WEBHOOK_SECRET: ${WEBHOOK_SECRET:-}
volumes:
- ${RAG_REPO_PATH:-./data}:/data
# Host path: set RAG_REPO_HOST in .env (e.g. /Users/you/repo). Falls back to RAG_REPO_PATH then ./data.
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
# SSH for git fetch (webhook): put deploy key and known_hosts in RAG_SSH_DIR. See docker/ssh/README.md.
- ${RAG_SSH_DIR:-./docker/ssh}:/root/.ssh:ro
entrypoint: ["rag-agent"]
command: ["serve", "--host", "0.0.0.0", "--port", "8000"]
networks:
- rag_net
bot:
build:
context: .
dockerfile: Dockerfile
image: rag-agent:latest
container_name: rag-bot
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy
environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
RAG_REPO_PATH: "/data"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN:-}
RAG_BOT_VERBOSE_LOGGING: ${RAG_BOT_VERBOSE_LOGGING:-true}
volumes:
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
entrypoint: ["rag-agent"]
command: ["bot"]
networks:
- rag_net
networks:
rag_net:
driver: bridge

View File

@@ -1,5 +1,5 @@
-- RAG vector DB schema (runs automatically on first Postgres init).
-- If RAG_EMBEDDINGS_DIM is not 1536, change vector(1536) below.
-- GigaChat Embeddings = 1024; for OpenAI use vector(1536).
CREATE EXTENSION IF NOT EXISTS vector;
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS chunks (
chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector(1536) NOT NULL,
embedding vector(1024) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'

4
docker/ssh/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
# Deploy key and known_hosts — add locally, do not commit
id_*
id_*.pub
known_hosts

19
docker/ssh/README.md Normal file
View File

@@ -0,0 +1,19 @@
# SSH for webhook (git fetch in container)
When the app runs in Docker and the webhook does `git fetch origin <branch>`, git uses SSH. The container has no keys by default, so you get "Host key verification failed" or "Permission denied".
## Setup
1. **Deploy key** (read-only key for the repo you index):
- Generate: `ssh-keygen -t ed25519 -f docker/ssh/id_ed25519 -N "" -C "rag-agent-deploy"`
- Add the **public** key (`docker/ssh/id_ed25519.pub`) to your Git server (GitHub/GitLab/… → repo → Deploy keys).
2. **Known hosts** (optional; `GIT_SSH_COMMAND` in compose accepts new host keys on first connect):
- To pin the host key: `ssh-keyscan -t ed25519 git.example.com >> docker/ssh/known_hosts`
- Replace `git.example.com` with your Git host (e.g. `github.com`, `git.lesha.spb.ru`).
3. **Permissions**: key file must be readable only by you, e.g. `chmod 600 docker/ssh/id_ed25519`.
4. **Compose**: by default this directory is mounted into the app container as `/root/.ssh`. Override with `RAG_SSH_DIR` in `.env` if you use another path (e.g. `RAG_SSH_DIR=/Users/you/.ssh` to use your main SSH dir).
After that, restart the app: `docker compose up -d`.

View File

@@ -12,6 +12,7 @@ dependencies = [
"gigachat>=0.2.0",
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"python-telegram-bot>=21.0",
]
[project.scripts]

View File

@@ -1,6 +1,5 @@
-- RAG vector DB schema for Postgres (pgvector).
-- Run once against an empty DB. If RAG_EMBEDDINGS_DIM is not 1536, change vector(1536) below.
-- Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
-- GigaChat Embeddings = 1024; for OpenAI use vector(1536). Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
CREATE EXTENSION IF NOT EXISTS vector;
@@ -28,7 +27,7 @@ CREATE TABLE IF NOT EXISTS chunks (
chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector(1536) NOT NULL,
embedding vector(1024) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'

View File

@@ -1,14 +1,21 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
import psycopg
from dotenv import load_dotenv
from rag_agent.config import AppConfig
from rag_agent.index.embeddings import EmbeddingClient
from rag_agent.retrieval.search import search_similar
_repo_root = Path(__file__).resolve().parent.parent.parent
load_dotenv(_repo_root / ".env")
class LLMClient(Protocol):
def generate(self, prompt: str, model: str) -> str:
@@ -20,10 +27,49 @@ class StubLLMClient:
def generate(self, prompt: str, model: str) -> str:
return (
"LLM client is not configured. "
"Replace StubLLMClient with a real implementation."
"Set GIGACHAT_CREDENTIALS in .env for GigaChat answers."
)
class GigaChatLLMClient:
"""LLM generation via GigaChat API. Credentials from env GIGACHAT_CREDENTIALS."""
def __init__(
self,
credentials: str,
model: str = "GigaChat",
verify_ssl_certs: bool = False,
) -> None:
self._credentials = credentials.strip()
self._model = model
self._verify_ssl_certs = verify_ssl_certs
def generate(self, prompt: str, model: str) -> str:
from gigachat import GigaChat
use_model = model or self._model
with GigaChat(
credentials=self._credentials,
model=use_model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
response = giga.chat(prompt)
return (response.choices[0].message.content or "").strip()
def get_llm_client(config: AppConfig) -> LLMClient:
"""Return GigaChat LLM client if credentials set, else stub."""
credentials = os.getenv("GIGACHAT_CREDENTIALS", "").strip()
if credentials:
return GigaChatLLMClient(
credentials=credentials,
model=config.llm_model,
verify_ssl_certs=os.getenv("GIGACHAT_VERIFY_SSL", "false").lower()
in ("1", "true", "yes"),
)
return StubLLMClient()
def build_prompt(question: str, contexts: list[str]) -> str:
joined = "\n\n".join(contexts)
return (
@@ -42,10 +88,32 @@ def answer_query(
top_k: int = 5,
story_id: int | None = None,
) -> str:
query_embedding = embedding_client.embed_texts([question])[0]
answer, _ = answer_query_with_stats(
conn, config, embedding_client, llm_client, question, top_k, story_id
)
return answer
def answer_query_with_stats(
conn: psycopg.Connection,
config: AppConfig,
embedding_client: EmbeddingClient,
llm_client: LLMClient,
question: str,
top_k: int = 5,
story_id: int | None = None,
) -> tuple[str, dict]:
"""Like answer_query but returns (answer, stats) for logging. stats: query_embeddings, chunks_found, answer."""
query_embeddings = embedding_client.embed_texts([question])
results = search_similar(
conn, query_embedding, top_k=top_k, story_id=story_id
conn, query_embeddings[0], top_k=top_k, story_id=story_id
)
contexts = [f"Source: {item.path}\n{item.content}" for item in results]
prompt = build_prompt(question, contexts)
return llm_client.generate(prompt, model=config.llm_model)
answer = llm_client.generate(prompt, model=config.llm_model)
stats = {
"query_embeddings": len(query_embeddings),
"chunks_found": len(results),
"answer": answer,
}
return answer, stats

View File

@@ -25,7 +25,7 @@ from rag_agent.index.postgres import (
update_story_indexed_range,
upsert_document,
)
from rag_agent.agent.pipeline import StubLLMClient, answer_query
from rag_agent.agent.pipeline import answer_query, get_llm_client
def _file_version(path: Path) -> str:
@@ -55,13 +55,24 @@ def cmd_index(args: argparse.Namespace) -> None:
existing = filter_existing(changed_files)
else:
removed = []
existing = [
p for p in Path(config.repo_path).rglob("*") if p.is_file()
]
repo_path = Path(config.repo_path)
if not repo_path.exists():
raise SystemExit(f"RAG_REPO_PATH does not exist: {config.repo_path}")
existing = [p for p in repo_path.rglob("*") if p.is_file()]
allowed = list(iter_text_files(existing, config.allowed_extensions))
print(
f"repo={config.repo_path} all_files={len(existing)} "
f"allowed={len(allowed)} ext={config.allowed_extensions}"
)
if not allowed:
print("No files to index (check path and extensions .md, .txt, .rst)")
return
for path in removed:
delete_document(conn, story_id, str(path))
indexed = 0
for path, text in iter_text_files(existing, config.allowed_extensions):
chunks = chunk_text_by_lines(
text,
@@ -88,11 +99,18 @@ def cmd_index(args: argparse.Namespace) -> None:
replace_chunks(
conn, document_id, chunks, embeddings, base_chunks=base_chunks
)
indexed += 1
print(f"Indexed {indexed} documents for story={args.story}")
if args.changed:
update_story_indexed_range(conn, story_id, base_ref, head_ref)
def cmd_bot(args: argparse.Namespace) -> None:
from rag_agent.telegram_bot import main as bot_main
bot_main()
def cmd_serve(args: argparse.Namespace) -> None:
import uvicorn
uvicorn.run(
@@ -113,7 +131,7 @@ def cmd_ask(args: argparse.Namespace) -> None:
if story_id is None:
raise SystemExit(f"Story not found: {args.story}")
embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = StubLLMClient()
llm_client = get_llm_client(config)
answer = answer_query(
conn,
config,
@@ -185,6 +203,12 @@ def build_parser() -> argparse.ArgumentParser:
)
serve_parser.set_defaults(func=cmd_serve)
bot_parser = sub.add_parser(
"bot",
help="Run Telegram bot: answers questions using RAG (requires TELEGRAM_BOT_TOKEN)",
)
bot_parser.set_defaults(func=cmd_bot)
return parser
@@ -196,3 +220,4 @@ def main() -> None:
if __name__ == "__main__":
main()

View File

@@ -59,9 +59,9 @@ def load_config() -> AppConfig:
chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50),
chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40),
chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1536),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1024), # GigaChat Embeddings = 1024; OpenAI = 1536
embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"),
llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"),
llm_model=os.getenv("RAG_LLM_MODEL", "GigaChat"),
allowed_extensions=tuple(
_env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"])
),

View File

@@ -34,6 +34,8 @@ class StubEmbeddingClient:
_GIGACHAT_BATCH_SIZE = 50
# GigaChat embeddings: max 514 tokens per input; Russian/English ~3 chars/token → truncate to stay under
_GIGACHAT_MAX_CHARS_PER_INPUT = 1200
class GigaChatEmbeddingClient:
@@ -57,16 +59,43 @@ class GigaChatEmbeddingClient:
return []
result: list[list[float]] = []
try:
for i in range(0, len(texts_list), _GIGACHAT_BATCH_SIZE):
batch = texts_list[i : i + _GIGACHAT_BATCH_SIZE]
raw_batch = texts_list[i : i + _GIGACHAT_BATCH_SIZE]
batch = [
t[: _GIGACHAT_MAX_CHARS_PER_INPUT] if len(t) > _GIGACHAT_MAX_CHARS_PER_INPUT else t
for t in raw_batch
]
with GigaChat(
credentials=self._credentials,
model=self._model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
response = giga.embeddings(model=self._model, input=batch)
# API: embeddings(texts: list[str]) — single positional argument (gigachat 0.2+)
response = giga.embeddings(batch)
# Preserve order by index
by_index = {item.index: item.embedding for item in response.data}
result.extend(by_index[j] for j in range(len(batch)))
except Exception as e:
from gigachat.exceptions import ResponseError, RequestEntityTooLargeError
is_402 = (
isinstance(e, ResponseError)
and (getattr(e, "status_code", None) == 402 or "402" in str(e) or "Payment Required" in str(e))
)
if is_402:
raise ValueError(
"GigaChat: недостаточно средств (402 Payment Required). "
"Пополните баланс в кабинете GigaChat или отключите GIGACHAT_CREDENTIALS для stub-режима."
)
if isinstance(e, RequestEntityTooLargeError) or (
isinstance(e, ResponseError) and getattr(e, "status_code", None) == 413
):
raise ValueError(
"GigaChat: превышен лимит токенов на один запрос (413). "
"Уменьшите RAG_CHUNK_SIZE_LINES или RAG_CHUNK_SIZE в .env (текущий лимит ~514 токенов на чанк)."
)
raise
return result

View File

@@ -5,6 +5,7 @@ from datetime import datetime, timezone
from typing import Iterable, Sequence
import psycopg
from pgvector import Vector
from pgvector.psycopg import register_vector
from rag_agent.ingest.chunker import TextChunk
@@ -63,6 +64,7 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
try:
cur.execute(f"ALTER TABLE stories {col_def};")
except psycopg.ProgrammingError:
conn.rollback()
pass
cur.execute(
"""
@@ -103,22 +105,29 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
try:
cur.execute(f"ALTER TABLE chunks {col_def};")
except psycopg.ProgrammingError:
conn.rollback()
pass
try:
cur.execute(
"ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;"
)
except psycopg.ProgrammingError:
conn.rollback()
pass
try:
cur.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'chunks'::regclass AND conname = 'chunks_change_type_check'
) THEN
ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check
CHECK (change_type IN ('added', 'modified', 'unchanged'));
END IF;
END $$;
"""
)
except psycopg.ProgrammingError:
pass # constraint may already exist
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_documents_story_id
@@ -339,6 +348,7 @@ def fetch_similar(
top_k: int,
story_id: int | None = None,
) -> list[tuple[str, str, float]]:
vec = Vector(query_embedding)
with conn.cursor() as cur:
if story_id is not None:
cur.execute(
@@ -350,7 +360,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s
LIMIT %s;
""",
(query_embedding, story_id, query_embedding, top_k),
(vec, story_id, vec, top_k),
)
else:
cur.execute(
@@ -361,7 +371,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s
LIMIT %s;
""",
(query_embedding, query_embedding, top_k),
(vec, vec, top_k),
)
rows = cur.fetchall()
return [(row[0], row[1], row[2]) for row in rows]

View File

@@ -0,0 +1,156 @@
"""Telegram bot: answers user questions using RAG (retrieval + GigaChat)."""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
_repo_root = Path(__file__).resolve().parent.parent
load_dotenv(_repo_root / ".env")
logger = logging.getLogger(__name__)
# Расширенное логирование: входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM.
# Включить/выключить: RAG_BOT_VERBOSE_LOGGING=true|false (по умолчанию true для отладки).
VERBOSE_LOGGING_MAX_ANSWER_CHARS = 500
def _verbose_logging_enabled() -> bool:
return os.getenv("RAG_BOT_VERBOSE_LOGGING", "true").lower() in ("1", "true", "yes")
def _run_rag(
question: str,
top_k: int = 5,
story_id: int | None = None,
with_stats: bool = False,
) -> str | tuple[str, dict]:
"""Synchronous RAG call: retrieval + LLM. Used from thread.
If with_stats=True, returns (answer, stats); else returns answer only.
"""
from rag_agent.config import load_config
from rag_agent.index.embeddings import get_embedding_client
from rag_agent.index.postgres import connect, ensure_schema
from rag_agent.agent.pipeline import answer_query, answer_query_with_stats, get_llm_client
config = load_config()
conn = connect(config.db_dsn)
try:
ensure_schema(conn, config.embeddings_dim)
embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = get_llm_client(config)
if with_stats:
return answer_query_with_stats(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
return answer_query(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
finally:
conn.close()
def run_bot() -> None:
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
if not token:
logger.error(
"TELEGRAM_BOT_TOKEN is required. Set it in .env or environment. "
"Container will stay up; restart after setting the token."
)
import time
while True:
time.sleep(3600)
from telegram import Update
from telegram.ext import Application, ContextTypes, MessageHandler, filters
verbose = _verbose_logging_enabled()
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text:
return
question = update.message.text.strip()
if not question:
await update.message.reply_text("Напишите вопрос текстом.")
return
user_id = update.effective_user.id if update.effective_user else None
chat_id = update.effective_chat.id if update.effective_chat else None
if verbose:
logger.info(
"received message user_id=%s chat_id=%s text=%s",
user_id,
chat_id,
repr(question[:200] + ("" if len(question) > 200 else "")),
)
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: _run_rag(
question,
top_k=5,
story_id=None,
with_stats=verbose,
),
)
if verbose:
answer, stats = result
logger.info(
"query_embeddings=%s chunks_found=%s",
stats["query_embeddings"],
stats["chunks_found"],
)
answer_preview = stats["answer"]
if len(answer_preview) > VERBOSE_LOGGING_MAX_ANSWER_CHARS:
answer_preview = (
answer_preview[:VERBOSE_LOGGING_MAX_ANSWER_CHARS] + ""
)
logger.info("llm_response=%s", repr(answer_preview))
else:
answer = result
if len(answer) > 4096:
answer = answer[:4090] + "\n"
await update.message.reply_text(answer)
except Exception as e:
logger.exception("RAG error")
await update.message.reply_text(
f"Не удалось получить ответ: {e!s}. "
"Проверьте RAG_DB_DSN и GIGACHAT_CREDENTIALS."
)
app = Application.builder().token(token).build()
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Telegram bot started (polling)")
app.run_polling(drop_pending_updates=True)
def main() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
# Убрать из лога пустые HTTP-ответы polling (без сообщений от пользователя)
for name in ("telegram", "httpx", "httpcore"):
logging.getLogger(name).setLevel(logging.WARNING)
try:
run_bot()
except KeyboardInterrupt:
pass
except ValueError as e:
raise SystemExit(e) from e

View File

@@ -16,6 +16,14 @@ from fastapi.responses import PlainTextResponse
logger = logging.getLogger(__name__)
# So background webhook logs (pull_and_index, index) appear when run via uvicorn
_rag_log = logging.getLogger("rag_agent")
if not _rag_log.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s: %(name)s: %(message)s"))
_rag_log.setLevel(logging.INFO)
_rag_log.addHandler(_h)
app = FastAPI(title="RAG Agent Webhook", version="0.1.0")
@@ -26,6 +34,14 @@ def _branch_from_ref(ref: str) -> str | None:
return ref.removeprefix("refs/heads/")
# GitHub/GitLab send null SHA as "before" when a branch is first created.
_NULL_SHA = "0000000000000000000000000000000000000000"
def _is_null_sha(sha: str | None) -> bool:
return sha is not None and sha == _NULL_SHA
def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool:
if not secret or not signature_header or not signature_header.startswith("sha256="):
return not secret
@@ -36,6 +52,12 @@ def _verify_github_signature(body: bytes, secret: str, signature_header: str | N
return hmac.compare_digest(received, expected)
def _decode_stderr(stderr: str | bytes | None) -> str:
if stderr is None:
return ""
return stderr.decode("utf-8", errors="replace") if isinstance(stderr, bytes) else stderr
def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool:
env = os.environ.copy()
env["RAG_REPO_PATH"] = repo_path
@@ -48,7 +70,10 @@ def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool
timeout=600,
)
if proc.returncode != 0:
logger.error("index failed: %s %s", proc.stdout, proc.stderr)
logger.error(
"index failed (story=%s base=%s head=%s): stdout=%s stderr=%s",
story, base_ref, head_ref, proc.stdout, proc.stderr,
)
return False
logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref)
return True
@@ -60,7 +85,18 @@ def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool
return False
def _pull_and_index(repo_path: str, branch: str) -> None:
def _pull_and_index(
repo_path: str,
branch: str,
*,
payload_before: str | None = None,
payload_after: str | None = None,
) -> None:
"""Fetch, checkout branch; index range from payload (before→after) or from merge result."""
logger.info(
"webhook: pull_and_index started branch=%s repo_path=%s payload_before=%s payload_after=%s",
branch, repo_path, payload_before, payload_after,
)
repo = Path(repo_path)
if not repo.is_dir() or not (repo / ".git").exists():
logger.warning("not a git repo or missing: %s", repo_path)
@@ -73,7 +109,7 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
timeout=60,
)
except subprocess.CalledProcessError as e:
logger.warning("git fetch failed: %s", e.stderr)
logger.warning("git fetch failed (branch=%s): %s", branch, _decode_stderr(e.stderr))
return
except Exception as e:
logger.exception("git fetch error: %s", e)
@@ -87,7 +123,61 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
timeout=10,
)
except subprocess.CalledProcessError as e:
logger.warning("git checkout %s failed: %s", branch, e.stderr)
logger.warning("git checkout %s failed: %s", branch, _decode_stderr(e.stderr))
return
# Branch deletion: after is null SHA → nothing to index
if _is_null_sha(payload_after):
logger.info("webhook: branch deletion detected (after is null SHA) for branch=%s; skipping index", branch)
return
# Prefer commit range from webhook payload (GitHub/GitLab before/after) so we index every push
# even when the clone is the same dir as the one that was pushed from (HEAD already at new commit).
if payload_before and payload_after and payload_before != payload_after:
# Update working tree to new commits (fetch only fetches refs; index reads files from disk)
origin_ref = f"origin/{branch}"
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True,
text=True,
timeout=60,
)
if merge_proc.returncode != 0:
logger.warning(
"webhook: git merge --ff-only failed (branch=%s); stderr=%s",
branch, _decode_stderr(merge_proc.stderr),
)
return
# New branch: before is null SHA → use auto (merge-base with default branch)
if _is_null_sha(payload_before):
logger.info(
"webhook: new branch detected (before is null SHA), using --base-ref auto story=%s head=%s",
branch, payload_after,
)
_run_index(repo_path, story=branch, base_ref="auto", head_ref=payload_after)
return
logger.info(
"webhook: running index from payload story=%s %s..%s",
branch, payload_before, payload_after,
)
_run_index(repo_path, story=branch, base_ref=payload_before, head_ref=payload_after)
return
if payload_before and payload_after and payload_before == payload_after:
logger.info("webhook: payload before==after for branch=%s (e.g. force-push); skipping index", branch)
return
# Fallback: no before/after in payload — infer from merge (original behaviour).
origin_ref = f"origin/{branch}"
rev_origin = subprocess.run(
["git", "-C", repo_path, "rev-parse", origin_ref],
capture_output=True,
text=True,
timeout=10,
)
origin_head = (rev_origin.stdout or "").strip() if rev_origin.returncode == 0 else None
if not origin_head:
logger.warning("after fetch: %s not found (wrong branch name?)", origin_ref)
return
try:
@@ -102,9 +192,16 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
logger.exception("rev-parse HEAD: %s", e)
return
if old_head == origin_head:
logger.info(
"no new commits for branch=%s (already at %s); skipping index",
branch, origin_head,
)
return
try:
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", f"origin/{branch}"],
["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True,
text=True,
timeout=60,
@@ -113,7 +210,10 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
logger.error("git merge timeout")
return
if merge_proc.returncode != 0:
logger.warning("git merge --ff-only failed (non-fast-forward?). Skipping index.")
logger.warning(
"git merge --ff-only failed (branch=%s, non-fast-forward?). stderr=%s Skipping index.",
branch, _decode_stderr(merge_proc.stderr),
)
return
new_head = subprocess.run(
@@ -124,9 +224,10 @@ def _pull_and_index(repo_path: str, branch: str) -> None:
)
new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None
if not old_head or not new_head or old_head == new_head:
logger.info("no new commits for branch=%s", branch)
logger.info("no new commits for branch=%s (old_head=%s new_head=%s)", branch, old_head, new_head)
return
logger.info("webhook: running index story=%s %s..%s", branch, old_head, new_head)
_run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head)
@@ -152,6 +253,10 @@ async def webhook(request: Request) -> Response:
if not branch:
return PlainTextResponse("Missing or unsupported ref", status_code=400)
# GitHub/GitLab push: before = previous commit, after = new tip (use for index range)
before = (payload.get("before") or "").strip() or None
after = (payload.get("after") or "").strip() or None
repo_path = os.getenv("RAG_REPO_PATH", "").strip()
if not repo_path:
return PlainTextResponse("RAG_REPO_PATH not set", status_code=500)
@@ -159,6 +264,7 @@ async def webhook(request: Request) -> Response:
threading.Thread(
target=_pull_and_index,
args=(repo_path, branch),
kwargs={"payload_before": before, "payload_after": after},
daemon=True,
).start()