Compare commits

...

10 Commits

27 changed files with 1401 additions and 52 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
src/rag_agent/.env src/rag_agent/.env
.env
docker/ssh
docker/postgres_data

22
Dockerfile Normal file
View File

@@ -0,0 +1,22 @@
# RAG Agent app. Build from repo root (clone git@git.lesha.spb.ru:alex/RagAgent.git then docker compose build).
FROM python:3.12-slim
WORKDIR /app
# Install git for optional in-image clone; app is usually COPY'd from build context
RUN apt-get update -qq && apt-get install -y --no-install-recommends git openssh-client \
&& rm -rf /var/lib/apt/lists/*
# Copy repo (when built from cloned repo: docker compose build)
COPY pyproject.toml ./
COPY src ./src
COPY README.md ./
RUN pip install --no-cache-dir -e .
# Default: run webhook server (override in compose or when running)
ENV RAG_DB_DSN=""
ENV RAG_REPO_PATH="/data"
EXPOSE 8000
ENTRYPOINT ["rag-agent"]
CMD ["serve", "--host", "0.0.0.0", "--port", "8000"]

126
README.md
View File

@@ -1,23 +1,77 @@
# RAG Agent (Postgres) # RAG Agent (Postgres)
Custom RAG agent that indexes text files from a git repository into Postgres Custom RAG agent that indexes text files from a git repository into Postgres
and answers queries using retrieval + LLM generation. Commits are tied to and answers queries using retrieval + LLM generation. **Changes are always in the context of a Story**: the unit of work is the story, not individual commits. The agent indexes **all changes from all commits** in the story range (base_ref..head_ref); per-commit indexing is not used.
**stories**; indexing and retrieval can be scoped by story.
## Quick start ## Quick start
1. Configure environment variables: 1. (Optional) Run Postgres and the app via Docker (clone the repo first):
- `git clone git@git.lesha.spb.ru:alex/RagAgent.git && cd RagAgent`
- `docker compose up -d` — starts Postgres and the RAG app in one network `rag_net`; app connects to DB at host `postgres`.
- On first start (empty DB), scripts in `docker/postgres-init/` run automatically (extension + tables). To disable, comment out the init volume in `docker-compose.yml`.
- Default DSN inside the app: `postgresql://rag:rag_secret@postgres:5432/rag`. Override with `POSTGRES_*` and `RAG_REPO_PATH` (path to your knowledge-base repo, mounted into the app container).
- Run commands: `docker compose run --rm app index --story my-branch`, `docker compose run --rm app ask "Question?"`.
2. Configure environment variables:
- `RAG_REPO_PATH` — path to git repo with text files - `RAG_REPO_PATH` — path to git repo with text files
- `RAG_DB_DSN` — Postgres DSN (e.g. `postgresql://user:pass@localhost:5432/rag`) - `RAG_DB_DSN` — Postgres DSN (e.g. `postgresql://rag:rag_secret@localhost:5432/rag`)
- `RAG_EMBEDDINGS_DIM` — embedding vector dimension (e.g. `1536`) - `RAG_EMBEDDINGS_DIM` — embedding vector dimension: **1024** for GigaChat Embeddings (default), 1536 for OpenAI
2. Create DB schema: 3. Create DB schema (only if not using Docker, or if init was disabled):
- `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`) - `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`)
3. Index files for a story (e.g. branch name as story slug): 4. Index files for a story (e.g. branch name as story slug). Use the **full story range** so all commits in the story are included:
- `rag-agent index --story my-branch --changed --base-ref HEAD~1 --head-ref HEAD` - `rag-agent index --story my-branch --changed --base-ref main --head-ref HEAD`
4. Ask a question (optionally scoped to a story): - Or `--base-ref auto` to use merge-base(default-branch, head-ref) as the start of the story.
5. Ask a question (optionally scoped to a story):
- `rag-agent ask "What is covered?"` - `rag-agent ask "What is covered?"`
- `rag-agent ask "What is covered?" --story my-branch` - `rag-agent ask "What is covered?" --story my-branch`
## Webhook: index on push to remote
When the app runs as a service in Docker, it can start a **webhook server** so that each push to the remote repository triggers a pull and incremental indexing.
1. Start the stack with the webhook server (default in Docker):
- `docker compose up -d` — app runs `rag-agent serve` and listens on port 8000.
- Repo is mounted at `RAG_REPO_PATH` (e.g. `/data`) **writable**, so the container can run `git fetch` + `git merge --ff-only` to pull changes.
2. Clone the knowledge-base repo into the mounted directory (once), e.g. on the host: `git clone <url> ./data` so that `./data` is the worktree (or set `RAG_REPO_PATH` to that path and mount it).
3. In GitHub (or GitLab) add a **Webhook**:
- URL: `http://<your-server>:8000/webhook` (use HTTPS in production and put a reverse proxy in front).
- Content type: `application/json`.
- Secret: set a shared secret and export `WEBHOOK_SECRET` in the app environment (Docker: in `docker-compose.yml` or `.env`). If `WEBHOOK_SECRET` is empty, signature is not checked.
4. On each push to a branch, the server receives the webhook, pulls that branch into the worktree, and runs `rag-agent index --story <branch> --changed --base-ref <old_head> --head-ref <new_head>` so only changed files are re-indexed.
Health check: `GET http://<host>:8000/health``ok`. Port is configurable via `WEBHOOK_PORT` (default 8000) in docker-compose.
### Webhook diagnostics (202 Accepted but no new rows in DB)
1. **Logs** — After a push, check app logs. Each webhook logs `pull_and_index started branch=… repo_path=…`; then one of:
- `not a git repo or missing``/data` in the container is not a git clone; clone the repo into the mounted dir.
- `git fetch failed` — SSH/network (see `docker/ssh/README.md`) or wrong remote.
- `git checkout … failed` — branch missing in the clone.
- `git merge --ff-only failed` — nonfast-forward (e.g. force-push); index is skipped. Use a normal push or re-clone.
- `no new commits for branch=…` — merge was a no-op (already up to date); nothing to index.
- `running index story=…` then `index completed` — index ran; check tables for that story.
- `index failed` — stderr shows the `rag-agent index` error (e.g. DB, embeddings, repo path).
```bash
docker compose logs -f app
# or: docker logs -f rag-agent
```
Trigger a push and watch for the lines above.
2. **Story and tables** — Rows are per **story** (branch name). Query by story, e.g. `SELECT * FROM stories;` then `SELECT * FROM chunks WHERE story_id = (SELECT id FROM stories WHERE slug = 'main');`.
3. **Manual index** — Run index inside the container to confirm DB and repo work:
```bash
docker compose exec app rag-agent index --story main --changed --base-ref main --head-ref HEAD
```
If this inserts rows, the issue is in the webhook path (fetch/merge/refs).
4. **Allowed extensions** — Only `.md`, `.txt`, `.rst` (or `RAG_ALLOWED_EXTENSIONS`) are indexed; other files are skipped.
5. **"expected 1536 dimensions, not 1024"** — GigaChat Embeddings returns 1024-dim vectors; the default is now 1024. If the DB was created earlier with vector(1536), drop and recreate the tables so the app can create them with 1024: `psql "$RAG_DB_DSN" -c "DROP TABLE IF EXISTS chunks; DROP TABLE IF EXISTS documents;"` then restart the app (ensure_schema will recreate the tables).
## Git hook (index on commit) ## Git hook (index on commit)
Install the post-commit hook so changed files are indexed after each commit: Install the post-commit hook so changed files are indexed after each commit:
@@ -28,16 +82,62 @@ cp scripts/post-commit .git/hooks/post-commit && chmod +x .git/hooks/post-commit
Story for the commit is taken from (in order): env `RAG_STORY`, file `.rag-story` in repo root (one line = slug), or current branch name. Story for the commit is taken from (in order): env `RAG_STORY`, file `.rag-story` in repo root (one line = slug), or current branch name.
## Git hook (server-side)
Use `scripts/post-receive` in the **bare repo** on the server so that pushes trigger indexing.
1. On the server, create a **non-bare clone** (worktree) that the hook will update and use for indexing, e.g. `git clone /path/to/repo.git /var/rag-worktree/repo`.
2. In the bare repo, install the hook: `cp /path/to/RagAgent/scripts/post-receive /path/to/repo.git/hooks/post-receive && chmod +x .../post-receive`.
3. Set env for the hook (e.g. in the hook or via systemd/sshd): `RAG_REPO_PATH=/var/rag-worktree/repo`, `RAG_DB_DSN=...`, `RAG_EMBEDDINGS_DIM=...`. Optionally `RAG_AGENT_VENV` (path to venv with `rag-agent`) or `RAG_AGENT_SRC` + `RAG_AGENT_PYTHON` for `python -m rag_agent.cli`.
4. On each push the hook updates the worktree to the new commit, then runs `rag-agent index --changed --base-ref main --head-ref newrev --story <branch>` so the story contains **all commits** on the branch (from main to newrev).
Story is taken from the ref name (e.g. `refs/heads/main` → `main`).
## DB structure ## DB structure
- **stories** — story slug (e.g. branch name); documents and chunks are tied to a story. - **stories** — story slug (e.g. branch name); documents and chunks are tied to a story. Optional: `indexed_base_ref`, `indexed_head_ref`, `indexed_at` record the git range that was indexed (all commits in that range belong to the story).
- **documents** — path + version per story; unique `(story_id, path)`. - **documents** — path + version per story; unique `(story_id, path)`.
- **chunks** — text chunks with embeddings (pgvector); updated when documents are re-indexed. - **chunks** — text chunks with embeddings (pgvector), plus:
- `start_line`, `end_line` — position in the source file (for requirements/use-case files).
- `change_type` — `added` | `modified` | `unchanged` (relative to base ref when indexing with `--changed`).
- `previous_content` — for `modified` chunks, the content before the change (for test-case generation).
Indexing is **always per-story**: `base_ref..head_ref` defines the set of commits that belong to the story. Use `--base-ref main` (or `auto`) and `--head-ref HEAD` so the story contains all commits on the branch, not a single commit. When you run `index --changed`, the base ref is compared to head; each chunk is marked as added, modified, or unchanged.
### What changed in a story (for test cases)
To get only the chunks that were added or modified in a story (e.g. to generate test cases for the changed part):
```python
from rag_agent.index import fetch_changed_chunks
changed = fetch_changed_chunks(conn, story_id)
for r in changed:
# r.path, r.content, r.change_type, r.start_line, r.end_line, r.previous_content
...
```
Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env), `scripts/schema.sql` (raw SQL). Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env), `scripts/schema.sql` (raw SQL).
## Embeddings (GigaChat)
If `GIGACHAT_CREDENTIALS` is set (e.g. in `.env` for local runs), embeddings use GigaChat API; otherwise the stub client is used. Optional env: `GIGACHAT_EMBEDDINGS_MODEL` (default `Embeddings`), `GIGACHAT_VERIFY_SSL` (`true`/`false`). Ensure `RAG_EMBEDDINGS_DIM` matches the model output (see GigaChat docs).
## Agent (GigaChat)
Ответы на вопросы формирует агент на базе GigaChat: поиск по базе знаний (RAG) + генерация текста. Если задана переменная `GIGACHAT_CREDENTIALS`, используется `GigaChatLLMClient` в `src/rag_agent/agent/pipeline.py`; иначе — заглушка. Модель чата задаётся через `RAG_LLM_MODEL` (по умолчанию `GigaChat`).
## Telegram-бот
Общение с пользователем через бота в Telegram: бот отвечает на текстовые сообщения, используя знания из базы (RAG + GigaChat).
1. Создайте бота через [@BotFather](https://t.me/BotFather) и получите токен.
2. Добавьте в `.env`: `TELEGRAM_BOT_TOKEN=<токен>`.
3. Запуск: `rag-agent bot` (или `python -m rag_agent.telegram_bot`).
4. Через Docker: `docker compose up -d` поднимает БД, вебхук-сервер и бота в отдельных контейнерах; в `.env` должен быть задан `TELEGRAM_BOT_TOKEN`.
Требуются: `RAG_DB_DSN`, `RAG_REPO_PATH`, `GIGACHAT_CREDENTIALS`, `TELEGRAM_BOT_TOKEN`. Расширенное логирование (входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM): `RAG_BOT_VERBOSE_LOGGING=true|false` (по умолчанию `true` для отладки).
## Notes ## Notes
- The default embedding/LLM clients are stubs. Replace them in
`src/rag_agent/index/embeddings.py` and `src/rag_agent/agent/pipeline.py`.
- This project requires Postgres with the `pgvector` extension. - This project requires Postgres with the `pgvector` extension.

91
docker-compose.yml Normal file
View File

@@ -0,0 +1,91 @@
# Postgres with pgvector + RAG Agent app (from repo git@git.lesha.spb.ru:alex/RagAgent.git).
# Clone the repo, then: docker compose up -d
# App and DB share network "rag_net"; app uses RAG_DB_DSN with host=postgres.
# DB init: scripts in docker/postgres-init/ run on first start (empty volume); to disable, comment out the init volume.
services:
postgres:
image: pgvector/pgvector:pg16
container_name: rag-postgres
environment:
POSTGRES_USER: ${POSTGRES_USER:-rag}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-rag_secret}
POSTGRES_DB: ${POSTGRES_DB:-rag}
ports:
- "${POSTGRES_PORT:-5432}:5432"
volumes:
# PG 18+: mount at /var/lib/postgresql (data goes in versioned subdir). For pg16 use /var/lib/postgresql/data.
- rag_pgdata:/var/lib/postgresql
# Init scripts run once on first start (create extension, tables). Optional: comment out to skip.
- ./docker/postgres-init:/docker-entrypoint-initdb.d:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rag} -d ${POSTGRES_DB:-rag}"]
interval: 5s
timeout: 5s
retries: 5
networks:
- rag_net
app:
build:
context: .
dockerfile: Dockerfile
image: rag-agent:latest
container_name: rag-agent
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy
ports:
- "${WEBHOOK_PORT:-8000}:8000"
environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
# In container repo is always at /data (mounted below). Use RAG_REPO_HOST in .env for host path.
RAG_REPO_PATH: "/data"
# Accept host key on first connect; git fetch uses SSH from /root/.ssh (mounted below).
GIT_SSH_COMMAND: "ssh -o StrictHostKeyChecking=accept-new"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
WEBHOOK_SECRET: ${WEBHOOK_SECRET:-}
volumes:
# Host path: set RAG_REPO_HOST in .env (e.g. /Users/you/repo). Falls back to RAG_REPO_PATH then ./data.
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
# SSH for git fetch (webhook): put deploy key and known_hosts in RAG_SSH_DIR. See docker/ssh/README.md.
- ${RAG_SSH_DIR:-./docker/ssh}:/root/.ssh:ro
entrypoint: ["rag-agent"]
command: ["serve", "--host", "0.0.0.0", "--port", "8000"]
networks:
- rag_net
bot:
build:
context: .
dockerfile: Dockerfile
image: rag-agent:latest
container_name: rag-bot
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy
environment:
RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}"
RAG_REPO_PATH: "/data"
RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1024}
GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-}
GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings}
TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN:-}
RAG_BOT_VERBOSE_LOGGING: ${RAG_BOT_VERBOSE_LOGGING:-true}
volumes:
- ${RAG_REPO_HOST:-${RAG_REPO_PATH:-./data}}:/data
entrypoint: ["rag-agent"]
command: ["bot"]
networks:
- rag_net
networks:
rag_net:
driver: bridge
volumes:
rag_pgdata:

View File

@@ -0,0 +1,7 @@
-- Example: create an extra DB user (e.g. read-only). Not executed — rename to 00-create-extra-user.sql to enable.
-- Scripts in this folder run in alphabetical order; 00-* runs before 01-schema.sql.
-- CREATE USER rag_readonly WITH PASSWORD 'change_me';
-- GRANT CONNECT ON DATABASE rag TO rag_readonly;
-- GRANT USAGE ON SCHEMA public TO rag_readonly;
-- GRANT SELECT ON ALL TABLES IN SCHEMA public TO rag_readonly;

View File

@@ -0,0 +1,41 @@
-- RAG vector DB schema (runs automatically on first Postgres init).
-- GigaChat Embeddings = 1024; for OpenAI use vector(1536).
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS stories (
id SERIAL PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
indexed_base_ref TEXT,
indexed_head_ref TEXT,
indexed_at TIMESTAMPTZ
);
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
story_id INTEGER NOT NULL REFERENCES stories(id) ON DELETE CASCADE,
path TEXT NOT NULL,
version TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
UNIQUE(story_id, path)
);
CREATE TABLE IF NOT EXISTS chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector(1024) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'
CHECK (change_type IN ('added', 'modified', 'unchanged')),
previous_content TEXT
);
CREATE INDEX IF NOT EXISTS idx_documents_story_id ON documents(story_id);
CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_chunks_change_type ON chunks(change_type);

View File

@@ -0,0 +1,9 @@
# Postgres init scripts (optional)
Files here are mounted into the Postgres container at `/docker-entrypoint-initdb.d/` and run **only on first startup** (when the data volume is empty), in alphabetical order.
- `01-schema.sql` — creates pgvector extension and RAG tables (stories, documents, chunks).
- To add more users or other setup, add scripts with names like `00-create-user.sql` (they run before `01-schema.sql`).
- To disable init: in `docker-compose.yml`, comment out the postgres volume that mounts this folder, or remove/rename the `.sql` files.
After the first run, these scripts are not executed again. To re-run them, remove the volume: `docker compose down -v` (this deletes DB data), then `docker compose up -d`.

4
docker/ssh/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
# Deploy key and known_hosts — add locally, do not commit
id_*
id_*.pub
known_hosts

19
docker/ssh/README.md Normal file
View File

@@ -0,0 +1,19 @@
# SSH for webhook (git fetch in container)
When the app runs in Docker and the webhook does `git fetch origin <branch>`, git uses SSH. The container has no keys by default, so you get "Host key verification failed" or "Permission denied".
## Setup
1. **Deploy key** (read-only key for the repo you index):
- Generate: `ssh-keygen -t ed25519 -f docker/ssh/id_ed25519 -N "" -C "rag-agent-deploy"`
- Add the **public** key (`docker/ssh/id_ed25519.pub`) to your Git server (GitHub/GitLab/… → repo → Deploy keys).
2. **Known hosts** (optional; `GIT_SSH_COMMAND` in compose accepts new host keys on first connect):
- To pin the host key: `ssh-keyscan -t ed25519 git.example.com >> docker/ssh/known_hosts`
- Replace `git.example.com` with your Git host (e.g. `github.com`, `git.lesha.spb.ru`).
3. **Permissions**: key file must be readable only by you, e.g. `chmod 600 docker/ssh/id_ed25519`.
4. **Compose**: by default this directory is mounted into the app container as `/root/.ssh`. Override with `RAG_SSH_DIR` in `.env` if you use another path (e.g. `RAG_SSH_DIR=/Users/you/.ssh` to use your main SSH dir).
After that, restart the app: `docker compose up -d`.

View File

@@ -8,6 +8,11 @@ dependencies = [
"psycopg[binary]>=3.1.18", "psycopg[binary]>=3.1.18",
"pgvector>=0.2.5", "pgvector>=0.2.5",
"pydantic>=2.7.0", "pydantic>=2.7.0",
"python-dotenv>=1.0.0",
"gigachat>=0.2.0",
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"python-telegram-bot>=21.0",
] ]
[project.scripts] [project.scripts]

View File

@@ -20,16 +20,15 @@ if [ -z "$STORY" ]; then
exit 0 exit 0
fi fi
# Run index (changed files only: previous commit -> HEAD) # Run index: all changes in the story (main..HEAD), not per-commit
if command -v rag-agent >/dev/null 2>&1; then if command -v rag-agent >/dev/null 2>&1; then
rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY"
elif [ -n "${VIRTUAL_ENV}" ]; then elif [ -n "${VIRTUAL_ENV}" ]; then
rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY"
else else
# Try repo venv or python -m
if [ -f "venv/bin/rag-agent" ]; then if [ -f "venv/bin/rag-agent" ]; then
venv/bin/rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" venv/bin/rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY"
else else
PYTHONPATH=src python -m rag_agent.cli index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" 2>/dev/null || true PYTHONPATH=src python -m rag_agent.cli index --changed --base-ref main --head-ref HEAD --story "$STORY" 2>/dev/null || true
fi fi
fi fi

43
scripts/post-receive Normal file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env sh
# Server-side hook: index changed files after push. Install in bare repo: cp scripts/post-receive /path/to/repo.git/hooks/post-receive
# Requires: RAG_REPO_PATH = path to a non-bare clone of this repo (worktree), updated by this hook to newrev.
# RAG_DB_DSN, RAG_EMBEDDINGS_DIM; optionally GIGACHAT_CREDENTIALS.
# Story is derived from ref name (e.g. refs/heads/main -> main).
set -e
if [ -z "${RAG_REPO_PATH}" ]; then
echo "post-receive: RAG_REPO_PATH (path to clone worktree) is required. Skipping index."
exit 0
fi
# Read refs from stdin (refname SP oldrev SP newrev LF)
while read -r refname oldrev newrev; do
# Skip branch deletions
if [ "$newrev" = "0000000000000000000000000000000000000000" ]; then
continue
fi
# Branch name from ref (e.g. refs/heads/main -> main)
branch="${refname#refs/heads/}"
[ -z "$branch" ] && continue
# Update worktree to newrev so rag-agent can read files from disk
if [ -d "${RAG_REPO_PATH}/.git" ]; then
git -C "${RAG_REPO_PATH}" fetch origin 2>/dev/null || true
git -C "${RAG_REPO_PATH}" checkout -f "${newrev}" 2>/dev/null || true
fi
# Index all commits in the story (main..newrev), not just this push
export RAG_REPO_PATH
if command -v rag-agent >/dev/null 2>&1; then
rag-agent index --changed --base-ref main --head-ref "${newrev}" --story "${branch}"
elif [ -f "${RAG_AGENT_VENV}/bin/rag-agent" ]; then
"${RAG_AGENT_VENV}/bin/rag-agent" index --changed --base-ref main --head-ref "${newrev}" --story "${branch}"
else
if [ -n "${RAG_AGENT_SRC}" ]; then
PYTHONPATH="${RAG_AGENT_SRC}/src" "${RAG_AGENT_PYTHON:-python3}" -m rag_agent.cli index --changed --base-ref main --head-ref "${newrev}" --story "${branch}" 2>/dev/null || true
fi
fi
done
exit 0

View File

@@ -1,13 +1,15 @@
-- RAG vector DB schema for Postgres (pgvector). -- RAG vector DB schema for Postgres (pgvector).
-- Run once against an empty DB. If RAG_EMBEDDINGS_DIM is not 1536, change vector(1536) below. -- GigaChat Embeddings = 1024; for OpenAI use vector(1536). Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
-- Usage: psql "$RAG_DB_DSN" -f scripts/schema.sql
CREATE EXTENSION IF NOT EXISTS vector; CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS stories ( CREATE TABLE IF NOT EXISTS stories (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
slug TEXT UNIQUE NOT NULL, slug TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
indexed_base_ref TEXT,
indexed_head_ref TEXT,
indexed_at TIMESTAMPTZ
); );
CREATE TABLE IF NOT EXISTS documents ( CREATE TABLE IF NOT EXISTS documents (
@@ -25,9 +27,15 @@ CREATE TABLE IF NOT EXISTS chunks (
chunk_index INTEGER NOT NULL, chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL, hash TEXT NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
embedding vector(1536) NOT NULL embedding vector(1024) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'
CHECK (change_type IN ('added', 'modified', 'unchanged')),
previous_content TEXT
); );
CREATE INDEX IF NOT EXISTS idx_documents_story_id ON documents(story_id); CREATE INDEX IF NOT EXISTS idx_documents_story_id ON documents(story_id);
CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks(document_id); CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops); CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_chunks_change_type ON chunks(change_type);

View File

@@ -1,14 +1,21 @@
from __future__ import annotations from __future__ import annotations
import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Protocol from typing import Protocol
import psycopg import psycopg
from dotenv import load_dotenv
from rag_agent.config import AppConfig from rag_agent.config import AppConfig
from rag_agent.index.embeddings import EmbeddingClient from rag_agent.index.embeddings import EmbeddingClient
from rag_agent.retrieval.search import search_similar from rag_agent.retrieval.search import search_similar
_repo_root = Path(__file__).resolve().parent.parent.parent
load_dotenv(_repo_root / ".env")
class LLMClient(Protocol): class LLMClient(Protocol):
def generate(self, prompt: str, model: str) -> str: def generate(self, prompt: str, model: str) -> str:
@@ -20,10 +27,49 @@ class StubLLMClient:
def generate(self, prompt: str, model: str) -> str: def generate(self, prompt: str, model: str) -> str:
return ( return (
"LLM client is not configured. " "LLM client is not configured. "
"Replace StubLLMClient with a real implementation." "Set GIGACHAT_CREDENTIALS in .env for GigaChat answers."
) )
class GigaChatLLMClient:
"""LLM generation via GigaChat API. Credentials from env GIGACHAT_CREDENTIALS."""
def __init__(
self,
credentials: str,
model: str = "GigaChat",
verify_ssl_certs: bool = False,
) -> None:
self._credentials = credentials.strip()
self._model = model
self._verify_ssl_certs = verify_ssl_certs
def generate(self, prompt: str, model: str) -> str:
from gigachat import GigaChat
use_model = model or self._model
with GigaChat(
credentials=self._credentials,
model=use_model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
response = giga.chat(prompt)
return (response.choices[0].message.content or "").strip()
def get_llm_client(config: AppConfig) -> LLMClient:
"""Return GigaChat LLM client if credentials set, else stub."""
credentials = os.getenv("GIGACHAT_CREDENTIALS", "").strip()
if credentials:
return GigaChatLLMClient(
credentials=credentials,
model=config.llm_model,
verify_ssl_certs=os.getenv("GIGACHAT_VERIFY_SSL", "false").lower()
in ("1", "true", "yes"),
)
return StubLLMClient()
def build_prompt(question: str, contexts: list[str]) -> str: def build_prompt(question: str, contexts: list[str]) -> str:
joined = "\n\n".join(contexts) joined = "\n\n".join(contexts)
return ( return (
@@ -42,10 +88,32 @@ def answer_query(
top_k: int = 5, top_k: int = 5,
story_id: int | None = None, story_id: int | None = None,
) -> str: ) -> str:
query_embedding = embedding_client.embed_texts([question])[0] answer, _ = answer_query_with_stats(
conn, config, embedding_client, llm_client, question, top_k, story_id
)
return answer
def answer_query_with_stats(
conn: psycopg.Connection,
config: AppConfig,
embedding_client: EmbeddingClient,
llm_client: LLMClient,
question: str,
top_k: int = 5,
story_id: int | None = None,
) -> tuple[str, dict]:
"""Like answer_query but returns (answer, stats) for logging. stats: query_embeddings, chunks_found, answer."""
query_embeddings = embedding_client.embed_texts([question])
results = search_similar( results = search_similar(
conn, query_embedding, top_k=top_k, story_id=story_id conn, query_embeddings[0], top_k=top_k, story_id=story_id
) )
contexts = [f"Source: {item.path}\n{item.content}" for item in results] contexts = [f"Source: {item.path}\n{item.content}" for item in results]
prompt = build_prompt(question, contexts) prompt = build_prompt(question, contexts)
return llm_client.generate(prompt, model=config.llm_model) answer = llm_client.generate(prompt, model=config.llm_model)
stats = {
"query_embeddings": len(query_embeddings),
"chunks_found": len(results),
"answer": answer,
}
return answer, stats

View File

@@ -5,9 +5,15 @@ import hashlib
from pathlib import Path from pathlib import Path
from rag_agent.config import load_config from rag_agent.config import load_config
from rag_agent.ingest.chunker import chunk_text from rag_agent.ingest.chunker import chunk_text_by_lines
from rag_agent.ingest.file_loader import iter_text_files from rag_agent.ingest.file_loader import iter_text_files
from rag_agent.ingest.git_watcher import filter_existing, filter_removed, get_changed_files from rag_agent.ingest.git_watcher import (
filter_existing,
filter_removed,
get_changed_files,
get_merge_base,
read_file_at_ref,
)
from rag_agent.index.embeddings import get_embedding_client from rag_agent.index.embeddings import get_embedding_client
from rag_agent.index.postgres import ( from rag_agent.index.postgres import (
connect, connect,
@@ -16,9 +22,10 @@ from rag_agent.index.postgres import (
get_or_create_story, get_or_create_story,
get_story_id, get_story_id,
replace_chunks, replace_chunks,
update_story_indexed_range,
upsert_document, upsert_document,
) )
from rag_agent.agent.pipeline import StubLLMClient, answer_query from rag_agent.agent.pipeline import answer_query, get_llm_client
def _file_version(path: Path) -> str: def _file_version(path: Path) -> str:
@@ -34,26 +41,84 @@ def cmd_index(args: argparse.Namespace) -> None:
story_id = get_or_create_story(conn, args.story) story_id = get_or_create_story(conn, args.story)
embedding_client = get_embedding_client(config.embeddings_dim) embedding_client = get_embedding_client(config.embeddings_dim)
base_ref = args.base_ref.strip()
head_ref = args.head_ref.strip()
if args.changed: if args.changed:
changed_files = get_changed_files(config.repo_path, args.base_ref, args.head_ref) if base_ref.lower() == "auto":
base_ref = get_merge_base(
config.repo_path, args.default_branch, head_ref
) or args.default_branch
changed_files = get_changed_files(
config.repo_path, base_ref, head_ref
)
removed = filter_removed(changed_files) removed = filter_removed(changed_files)
existing = filter_existing(changed_files) existing = filter_existing(changed_files)
else: else:
removed = [] removed = []
existing = [path for path in Path(config.repo_path).rglob("*") if path.is_file()] repo_path = Path(config.repo_path)
if not repo_path.exists():
raise SystemExit(f"RAG_REPO_PATH does not exist: {config.repo_path}")
existing = [p for p in repo_path.rglob("*") if p.is_file()]
allowed = list(iter_text_files(existing, config.allowed_extensions))
print(
f"repo={config.repo_path} all_files={len(existing)} "
f"allowed={len(allowed)} ext={config.allowed_extensions}"
)
if not allowed:
print("No files to index (check path and extensions .md, .txt, .rst)")
return
for path in removed: for path in removed:
delete_document(conn, story_id, str(path)) delete_document(conn, story_id, str(path))
indexed = 0
for path, text in iter_text_files(existing, config.allowed_extensions): for path, text in iter_text_files(existing, config.allowed_extensions):
chunks = chunk_text(text, config.chunk_size, config.chunk_overlap) chunks = chunk_text_by_lines(
text,
config.chunk_size_lines,
config.chunk_overlap_lines,
)
if not chunks: if not chunks:
continue continue
embeddings = embedding_client.embed_texts([chunk.text for chunk in chunks]) base_chunks = None
if args.changed:
base_text = read_file_at_ref(config.repo_path, path, base_ref)
if base_text is not None:
base_chunks = chunk_text_by_lines(
base_text,
config.chunk_size_lines,
config.chunk_overlap_lines,
)
embeddings = embedding_client.embed_texts(
[chunk.text for chunk in chunks]
)
document_id = upsert_document( document_id = upsert_document(
conn, story_id, str(path), _file_version(path) conn, story_id, str(path), _file_version(path)
) )
replace_chunks(conn, document_id, chunks, embeddings) replace_chunks(
conn, document_id, chunks, embeddings, base_chunks=base_chunks
)
indexed += 1
print(f"Indexed {indexed} documents for story={args.story}")
if args.changed:
update_story_indexed_range(conn, story_id, base_ref, head_ref)
def cmd_bot(args: argparse.Namespace) -> None:
from rag_agent.telegram_bot import main as bot_main
bot_main()
def cmd_serve(args: argparse.Namespace) -> None:
import uvicorn
uvicorn.run(
"rag_agent.webhook:app",
host=args.host,
port=args.port,
log_level="info",
)
def cmd_ask(args: argparse.Namespace) -> None: def cmd_ask(args: argparse.Namespace) -> None:
@@ -66,7 +131,7 @@ def cmd_ask(args: argparse.Namespace) -> None:
if story_id is None: if story_id is None:
raise SystemExit(f"Story not found: {args.story}") raise SystemExit(f"Story not found: {args.story}")
embedding_client = get_embedding_client(config.embeddings_dim) embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = StubLLMClient() llm_client = get_llm_client(config)
answer = answer_query( answer = answer_query(
conn, conn,
config, config,
@@ -89,9 +154,26 @@ def build_parser() -> argparse.ArgumentParser:
required=True, required=True,
help="Story slug (e.g. branch name or story id); documents are tied to this story", help="Story slug (e.g. branch name or story id); documents are tied to this story",
) )
index_parser.add_argument("--changed", action="store_true", help="Index only changed files") index_parser.add_argument(
index_parser.add_argument("--base-ref", default="HEAD~1", help="Base git ref") "--changed",
index_parser.add_argument("--head-ref", default="HEAD", help="Head git ref") action="store_true",
help="Index only files changed in the story range (base-ref..head-ref); all commits in range belong to the story",
)
index_parser.add_argument(
"--base-ref",
default="main",
help="Start of story range (e.g. main). Use 'auto' for merge-base(default-branch, head-ref). All commits from base to head are the story.",
)
index_parser.add_argument(
"--head-ref",
default="HEAD",
help="End of story range (e.g. current branch tip)",
)
index_parser.add_argument(
"--default-branch",
default="main",
help="Default branch name for --base-ref auto",
)
index_parser.set_defaults(func=cmd_index) index_parser.set_defaults(func=cmd_index)
ask_parser = sub.add_parser("ask", help="Ask a question") ask_parser = sub.add_parser("ask", help="Ask a question")
@@ -104,6 +186,29 @@ def build_parser() -> argparse.ArgumentParser:
ask_parser.add_argument("--top-k", type=int, default=5, help="Top K chunks to retrieve") ask_parser.add_argument("--top-k", type=int, default=5, help="Top K chunks to retrieve")
ask_parser.set_defaults(func=cmd_ask) ask_parser.set_defaults(func=cmd_ask)
serve_parser = sub.add_parser(
"serve",
help="Run webhook server: on push to remote repo, pull and index changes",
)
serve_parser.add_argument(
"--host",
default="0.0.0.0",
help="Bind host (default: 0.0.0.0)",
)
serve_parser.add_argument(
"--port",
type=int,
default=8000,
help="Bind port (default: 8000)",
)
serve_parser.set_defaults(func=cmd_serve)
bot_parser = sub.add_parser(
"bot",
help="Run Telegram bot: answers questions using RAG (requires TELEGRAM_BOT_TOKEN)",
)
bot_parser.set_defaults(func=cmd_bot)
return parser return parser
@@ -115,3 +220,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -2,8 +2,15 @@ from __future__ import annotations
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence from typing import Iterable, Sequence
from dotenv import load_dotenv
# Load .env from repo root when config is used (e.g. for local runs)
_repo_root = Path(__file__).resolve().parent.parent.parent
load_dotenv(_repo_root / ".env")
@dataclass(frozen=True) @dataclass(frozen=True)
class AppConfig: class AppConfig:
@@ -11,6 +18,8 @@ class AppConfig:
db_dsn: str db_dsn: str
chunk_size: int chunk_size: int
chunk_overlap: int chunk_overlap: int
chunk_size_lines: int
chunk_overlap_lines: int
embeddings_dim: int embeddings_dim: int
embeddings_model: str embeddings_model: str
llm_model: str llm_model: str
@@ -48,9 +57,11 @@ def load_config() -> AppConfig:
db_dsn=db_dsn, db_dsn=db_dsn,
chunk_size=_env_int("RAG_CHUNK_SIZE", 400), chunk_size=_env_int("RAG_CHUNK_SIZE", 400),
chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50), chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1536), chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40),
chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1024), # GigaChat Embeddings = 1024; OpenAI = 1536
embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"), embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"),
llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"), llm_model=os.getenv("RAG_LLM_MODEL", "GigaChat"),
allowed_extensions=tuple( allowed_extensions=tuple(
_env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"]) _env_list("RAG_ALLOWED_EXTENSIONS", [".md", ".txt", ".rst"])
), ),

View File

@@ -1 +1,11 @@
__all__ = [] from rag_agent.index.postgres import (
ChangedChunkRecord,
fetch_changed_chunks,
get_story_indexed_range,
)
__all__ = [
"ChangedChunkRecord",
"fetch_changed_chunks",
"get_story_indexed_range",
]

View File

@@ -1,9 +1,17 @@
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Protocol from typing import Iterable, Protocol
from dotenv import load_dotenv
# Ensure .env is loaded when resolving embedding client (e.g. GIGACHAT_CREDENTIALS)
_repo_root = Path(__file__).resolve().parent.parent.parent.parent
load_dotenv(_repo_root / ".env")
class EmbeddingClient(Protocol): class EmbeddingClient(Protocol):
def embed_texts(self, texts: Iterable[str]) -> list[list[float]]: def embed_texts(self, texts: Iterable[str]) -> list[list[float]]:
@@ -25,5 +33,79 @@ class StubEmbeddingClient:
return vectors return vectors
_GIGACHAT_BATCH_SIZE = 50
# GigaChat embeddings: max 514 tokens per input; Russian/English ~3 chars/token → truncate to stay under
_GIGACHAT_MAX_CHARS_PER_INPUT = 1200
class GigaChatEmbeddingClient:
"""Embeddings via GigaChat API. Credentials from env GIGACHAT_CREDENTIALS."""
def __init__(
self,
credentials: str,
model: str = "Embeddings",
verify_ssl_certs: bool = False,
) -> None:
self._credentials = credentials.strip()
self._model = model
self._verify_ssl_certs = verify_ssl_certs
def embed_texts(self, texts: Iterable[str]) -> list[list[float]]:
from gigachat import GigaChat
texts_list = list(texts)
if not texts_list:
return []
result: list[list[float]] = []
try:
for i in range(0, len(texts_list), _GIGACHAT_BATCH_SIZE):
raw_batch = texts_list[i : i + _GIGACHAT_BATCH_SIZE]
batch = [
t[: _GIGACHAT_MAX_CHARS_PER_INPUT] if len(t) > _GIGACHAT_MAX_CHARS_PER_INPUT else t
for t in raw_batch
]
with GigaChat(
credentials=self._credentials,
model=self._model,
verify_ssl_certs=self._verify_ssl_certs,
) as giga:
# API: embeddings(texts: list[str]) — single positional argument (gigachat 0.2+)
response = giga.embeddings(batch)
# Preserve order by index
by_index = {item.index: item.embedding for item in response.data}
result.extend(by_index[j] for j in range(len(batch)))
except Exception as e:
from gigachat.exceptions import ResponseError, RequestEntityTooLargeError
is_402 = (
isinstance(e, ResponseError)
and (getattr(e, "status_code", None) == 402 or "402" in str(e) or "Payment Required" in str(e))
)
if is_402:
raise ValueError(
"GigaChat: недостаточно средств (402 Payment Required). "
"Пополните баланс в кабинете GigaChat или отключите GIGACHAT_CREDENTIALS для stub-режима."
)
if isinstance(e, RequestEntityTooLargeError) or (
isinstance(e, ResponseError) and getattr(e, "status_code", None) == 413
):
raise ValueError(
"GigaChat: превышен лимит токенов на один запрос (413). "
"Уменьшите RAG_CHUNK_SIZE_LINES или RAG_CHUNK_SIZE в .env (текущий лимит ~514 токенов на чанк)."
)
raise
return result
def get_embedding_client(dim: int) -> EmbeddingClient: def get_embedding_client(dim: int) -> EmbeddingClient:
credentials = os.getenv("GIGACHAT_CREDENTIALS", "").strip()
if credentials:
return GigaChatEmbeddingClient(
credentials=credentials,
model=os.getenv("GIGACHAT_EMBEDDINGS_MODEL", "Embeddings"),
verify_ssl_certs=os.getenv("GIGACHAT_VERIFY_SSL", "false").lower()
in ("1", "true", "yes"),
)
return StubEmbeddingClient(dim=dim) return StubEmbeddingClient(dim=dim)

View File

@@ -2,13 +2,18 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Iterable from typing import Iterable, Sequence
import psycopg import psycopg
from pgvector import Vector
from pgvector.psycopg import register_vector from pgvector.psycopg import register_vector
from rag_agent.ingest.chunker import TextChunk from rag_agent.ingest.chunker import TextChunk
CHANGE_ADDED = "added"
CHANGE_MODIFIED = "modified"
CHANGE_UNCHANGED = "unchanged"
@dataclass(frozen=True) @dataclass(frozen=True)
class ChunkRecord: class ChunkRecord:
@@ -18,6 +23,18 @@ class ChunkRecord:
embedding: list[float] embedding: list[float]
@dataclass(frozen=True)
class ChangedChunkRecord:
"""Chunk that was added or modified in a story (for test-case generation)."""
path: str
content: str
change_type: str
start_line: int | None
end_line: int | None
previous_content: str | None
def connect(dsn: str) -> psycopg.Connection: def connect(dsn: str) -> psycopg.Connection:
conn = psycopg.connect(dsn) conn = psycopg.connect(dsn)
register_vector(conn) register_vector(conn)
@@ -32,10 +49,23 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
CREATE TABLE IF NOT EXISTS stories ( CREATE TABLE IF NOT EXISTS stories (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
slug TEXT UNIQUE NOT NULL, slug TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
indexed_base_ref TEXT,
indexed_head_ref TEXT,
indexed_at TIMESTAMPTZ
); );
""" """
) )
for col_def in (
"ADD COLUMN IF NOT EXISTS indexed_base_ref TEXT",
"ADD COLUMN IF NOT EXISTS indexed_head_ref TEXT",
"ADD COLUMN IF NOT EXISTS indexed_at TIMESTAMPTZ",
):
try:
cur.execute(f"ALTER TABLE stories {col_def};")
except psycopg.ProgrammingError:
conn.rollback()
pass
cur.execute( cur.execute(
""" """
CREATE TABLE IF NOT EXISTS documents ( CREATE TABLE IF NOT EXISTS documents (
@@ -56,10 +86,48 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
chunk_index INTEGER NOT NULL, chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL, hash TEXT NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
embedding vector({embeddings_dim}) NOT NULL embedding vector({embeddings_dim}) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'
CHECK (change_type IN ('added', 'modified', 'unchanged')),
previous_content TEXT
); );
""" """
) )
# Migrations: add columns if table already existed without them (Postgres 11+)
for col_def in (
"ADD COLUMN IF NOT EXISTS start_line INTEGER",
"ADD COLUMN IF NOT EXISTS end_line INTEGER",
"ADD COLUMN IF NOT EXISTS previous_content TEXT",
"ADD COLUMN IF NOT EXISTS change_type TEXT DEFAULT 'added'",
):
try:
cur.execute(f"ALTER TABLE chunks {col_def};")
except psycopg.ProgrammingError:
conn.rollback()
pass
try:
cur.execute(
"ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;"
)
except psycopg.ProgrammingError:
conn.rollback()
pass
cur.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = 'chunks'::regclass AND conname = 'chunks_change_type_check'
) THEN
ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check
CHECK (change_type IN ('added', 'modified', 'unchanged'));
END IF;
END $$;
"""
)
cur.execute( cur.execute(
""" """
CREATE INDEX IF NOT EXISTS idx_documents_story_id CREATE INDEX IF NOT EXISTS idx_documents_story_id
@@ -78,6 +146,12 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
ON chunks USING ivfflat (embedding vector_cosine_ops); ON chunks USING ivfflat (embedding vector_cosine_ops);
""" """
) )
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_chunks_change_type
ON chunks(change_type);
"""
)
conn.commit() conn.commit()
@@ -97,6 +171,44 @@ def get_or_create_story(conn: psycopg.Connection, slug: str) -> int:
return story_id return story_id
def update_story_indexed_range(
conn: psycopg.Connection,
story_id: int,
base_ref: str,
head_ref: str,
) -> None:
"""Record that this story was indexed as all changes from base_ref to head_ref (all commits in story)."""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE stories
SET indexed_base_ref = %s, indexed_head_ref = %s,
indexed_at = (NOW() AT TIME ZONE 'utc')
WHERE id = %s;
""",
(base_ref.strip(), head_ref.strip(), story_id),
)
conn.commit()
def get_story_indexed_range(
conn: psycopg.Connection, story_id: int
) -> tuple[str | None, str | None, datetime | None]:
"""Return (indexed_base_ref, indexed_head_ref, indexed_at) for the story, or (None, None, None)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT indexed_base_ref, indexed_head_ref, indexed_at
FROM stories WHERE id = %s;
""",
(story_id,),
)
row = cur.fetchone()
if row is None:
return (None, None, None)
return (row[0], row[1], row[2])
def get_story_id(conn: psycopg.Connection, slug: str) -> int | None: def get_story_id(conn: psycopg.Connection, slug: str) -> int | None:
s = slug.strip() s = slug.strip()
with conn.cursor() as cur: with conn.cursor() as cur:
@@ -127,28 +239,98 @@ def upsert_document(
return document_id return document_id
def _change_type_and_previous(
chunk: TextChunk,
base_by_range: dict[tuple[int, int], TextChunk],
) -> tuple[str, str | None]:
"""Determine change_type and previous_content for a chunk given base chunks keyed by (start_line, end_line)."""
if chunk.start_line is None or chunk.end_line is None:
return (CHANGE_ADDED, None)
key = (chunk.start_line, chunk.end_line)
base = base_by_range.get(key)
if base is None:
return (CHANGE_ADDED, None)
if base.hash == chunk.hash:
return (CHANGE_UNCHANGED, None)
return (CHANGE_MODIFIED, base.text)
def replace_chunks( def replace_chunks(
conn: psycopg.Connection, conn: psycopg.Connection,
document_id: int, document_id: int,
chunks: Iterable[TextChunk], chunks: Iterable[TextChunk],
embeddings: Iterable[list[float]], embeddings: Iterable[list[float]],
base_chunks: Sequence[TextChunk] | None = None,
) -> None: ) -> None:
base_by_range: dict[tuple[int, int], TextChunk] = {}
if base_chunks:
for c in base_chunks:
if c.start_line is not None and c.end_line is not None:
base_by_range[(c.start_line, c.end_line)] = c
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
"DELETE FROM chunks WHERE document_id = %s;", "DELETE FROM chunks WHERE document_id = %s;",
(document_id,), (document_id,),
) )
for chunk, embedding in zip(chunks, embeddings): for chunk, embedding in zip(chunks, embeddings):
change_type, previous_content = _change_type_and_previous(
chunk, base_by_range
)
cur.execute( cur.execute(
""" """
INSERT INTO chunks (document_id, chunk_index, hash, content, embedding) INSERT INTO chunks (
VALUES (%s, %s, %s, %s, %s); document_id, chunk_index, hash, content, embedding,
start_line, end_line, change_type, previous_content
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
""", """,
(document_id, chunk.index, chunk.hash, chunk.text, embedding), (
document_id,
chunk.index,
chunk.hash,
chunk.text,
embedding,
chunk.start_line,
chunk.end_line,
change_type,
previous_content,
),
) )
conn.commit() conn.commit()
def fetch_changed_chunks(
conn: psycopg.Connection, story_id: int
) -> list[ChangedChunkRecord]:
"""Return chunks that were added or modified in this story (for test-case generation)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT d.path, c.content, c.change_type, c.start_line, c.end_line,
c.previous_content
FROM chunks c
JOIN documents d ON d.id = c.document_id
WHERE d.story_id = %s
AND c.change_type IN ('added', 'modified')
ORDER BY d.path, c.start_line NULLS FIRST, c.chunk_index;
""",
(story_id,),
)
rows = cur.fetchall()
return [
ChangedChunkRecord(
path=row[0],
content=row[1],
change_type=row[2],
start_line=row[3],
end_line=row[4],
previous_content=row[5],
)
for row in rows
]
def delete_document( def delete_document(
conn: psycopg.Connection, story_id: int, path: str conn: psycopg.Connection, story_id: int, path: str
) -> None: ) -> None:
@@ -166,6 +348,7 @@ def fetch_similar(
top_k: int, top_k: int,
story_id: int | None = None, story_id: int | None = None,
) -> list[tuple[str, str, float]]: ) -> list[tuple[str, str, float]]:
vec = Vector(query_embedding)
with conn.cursor() as cur: with conn.cursor() as cur:
if story_id is not None: if story_id is not None:
cur.execute( cur.execute(
@@ -177,7 +360,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s ORDER BY c.embedding <=> %s
LIMIT %s; LIMIT %s;
""", """,
(query_embedding, story_id, query_embedding, top_k), (vec, story_id, vec, top_k),
) )
else: else:
cur.execute( cur.execute(
@@ -188,7 +371,7 @@ def fetch_similar(
ORDER BY c.embedding <=> %s ORDER BY c.embedding <=> %s
LIMIT %s; LIMIT %s;
""", """,
(query_embedding, query_embedding, top_k), (vec, vec, top_k),
) )
rows = cur.fetchall() rows = cur.fetchall()
return [(row[0], row[1], row[2]) for row in rows] return [(row[0], row[1], row[2]) for row in rows]

View File

@@ -10,6 +10,8 @@ class TextChunk:
index: int index: int
text: str text: str
hash: str hash: str
start_line: int | None = None
end_line: int | None = None
def _hash_text(text: str) -> str: def _hash_text(text: str) -> str:
@@ -27,7 +29,15 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> list[TextChunk]:
while start < len(tokens): while start < len(tokens):
end = min(start + chunk_size, len(tokens)) end = min(start + chunk_size, len(tokens))
chunk_text_value = " ".join(tokens[start:end]) chunk_text_value = " ".join(tokens[start:end])
chunks.append(TextChunk(index=index, text=chunk_text_value, hash=_hash_text(chunk_text_value))) chunks.append(
TextChunk(
index=index,
text=chunk_text_value,
hash=_hash_text(chunk_text_value),
start_line=None,
end_line=None,
)
)
index += 1 index += 1
if end == len(tokens): if end == len(tokens):
break break
@@ -40,3 +50,34 @@ def iter_chunks(
) -> Iterator[list[TextChunk]]: ) -> Iterator[list[TextChunk]]:
for text in texts: for text in texts:
yield chunk_text(text, chunk_size, overlap) yield chunk_text(text, chunk_size, overlap)
def chunk_text_by_lines(
text: str, max_lines: int, overlap_lines: int
) -> list[TextChunk]:
"""Chunk by consecutive lines; each chunk has start_line/end_line (1-based)."""
lines = text.splitlines()
if not lines:
return []
chunks: list[TextChunk] = []
index = 0
start = 0
while start < len(lines):
end = min(start + max_lines, len(lines))
chunk_lines = lines[start:end]
chunk_text_value = "\n".join(chunk_lines)
chunks.append(
TextChunk(
index=index,
text=chunk_text_value,
hash=_hash_text(chunk_text_value),
start_line=start + 1,
end_line=end,
)
)
index += 1
if end == len(lines):
break
start = max(end - overlap_lines, 0)
return chunks

View File

@@ -40,3 +40,53 @@ def filter_existing(paths: Iterable[Path]) -> list[Path]:
def filter_removed(paths: Iterable[Path]) -> list[Path]: def filter_removed(paths: Iterable[Path]) -> list[Path]:
return [path for path in paths if not path.exists()] return [path for path in paths if not path.exists()]
def get_merge_base(
repo_path: str, ref1: str, ref2: str = "HEAD"
) -> str | None:
"""Return merge-base commit of ref1 and ref2 (start of story range). None on error."""
args = [
"git",
"-C",
repo_path,
"merge-base",
ref1,
ref2,
]
try:
result = subprocess.run(
args,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip() or None
except subprocess.CalledProcessError:
return None
def read_file_at_ref(
repo_path: str, path: Path, ref: str
) -> str | None:
"""Read file content at a git ref. Returns None if path is missing at ref."""
repo = Path(repo_path)
rel = path.relative_to(repo) if path.is_absolute() else path
rel_str = rel.as_posix()
args = [
"git",
"-C",
repo_path,
"show",
f"{ref}:{rel_str}",
]
try:
result = subprocess.run(
args,
check=True,
capture_output=True,
text=True,
)
return result.stdout
except subprocess.CalledProcessError:
return None

View File

@@ -0,0 +1,156 @@
"""Telegram bot: answers user questions using RAG (retrieval + GigaChat)."""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
_repo_root = Path(__file__).resolve().parent.parent
load_dotenv(_repo_root / ".env")
logger = logging.getLogger(__name__)
# Расширенное логирование: входящие сообщения, число эмбеддингов, число чанков из БД, ответ LLM.
# Включить/выключить: RAG_BOT_VERBOSE_LOGGING=true|false (по умолчанию true для отладки).
VERBOSE_LOGGING_MAX_ANSWER_CHARS = 500
def _verbose_logging_enabled() -> bool:
return os.getenv("RAG_BOT_VERBOSE_LOGGING", "true").lower() in ("1", "true", "yes")
def _run_rag(
question: str,
top_k: int = 5,
story_id: int | None = None,
with_stats: bool = False,
) -> str | tuple[str, dict]:
"""Synchronous RAG call: retrieval + LLM. Used from thread.
If with_stats=True, returns (answer, stats); else returns answer only.
"""
from rag_agent.config import load_config
from rag_agent.index.embeddings import get_embedding_client
from rag_agent.index.postgres import connect, ensure_schema
from rag_agent.agent.pipeline import answer_query, answer_query_with_stats, get_llm_client
config = load_config()
conn = connect(config.db_dsn)
try:
ensure_schema(conn, config.embeddings_dim)
embedding_client = get_embedding_client(config.embeddings_dim)
llm_client = get_llm_client(config)
if with_stats:
return answer_query_with_stats(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
return answer_query(
conn,
config,
embedding_client,
llm_client,
question,
top_k=top_k,
story_id=story_id,
)
finally:
conn.close()
def run_bot() -> None:
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
if not token:
logger.error(
"TELEGRAM_BOT_TOKEN is required. Set it in .env or environment. "
"Container will stay up; restart after setting the token."
)
import time
while True:
time.sleep(3600)
from telegram import Update
from telegram.ext import Application, ContextTypes, MessageHandler, filters
verbose = _verbose_logging_enabled()
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text:
return
question = update.message.text.strip()
if not question:
await update.message.reply_text("Напишите вопрос текстом.")
return
user_id = update.effective_user.id if update.effective_user else None
chat_id = update.effective_chat.id if update.effective_chat else None
if verbose:
logger.info(
"received message user_id=%s chat_id=%s text=%s",
user_id,
chat_id,
repr(question[:200] + ("" if len(question) > 200 else "")),
)
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: _run_rag(
question,
top_k=5,
story_id=None,
with_stats=verbose,
),
)
if verbose:
answer, stats = result
logger.info(
"query_embeddings=%s chunks_found=%s",
stats["query_embeddings"],
stats["chunks_found"],
)
answer_preview = stats["answer"]
if len(answer_preview) > VERBOSE_LOGGING_MAX_ANSWER_CHARS:
answer_preview = (
answer_preview[:VERBOSE_LOGGING_MAX_ANSWER_CHARS] + ""
)
logger.info("llm_response=%s", repr(answer_preview))
else:
answer = result
if len(answer) > 4096:
answer = answer[:4090] + "\n"
await update.message.reply_text(answer)
except Exception as e:
logger.exception("RAG error")
await update.message.reply_text(
f"Не удалось получить ответ: {e!s}. "
"Проверьте RAG_DB_DSN и GIGACHAT_CREDENTIALS."
)
app = Application.builder().token(token).build()
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Telegram bot started (polling)")
app.run_polling(drop_pending_updates=True)
def main() -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
# Убрать из лога пустые HTTP-ответы polling (без сообщений от пользователя)
for name in ("telegram", "httpx", "httpcore"):
logging.getLogger(name).setLevel(logging.WARNING)
try:
run_bot()
except KeyboardInterrupt:
pass
except ValueError as e:
raise SystemExit(e) from e

276
src/rag_agent/webhook.py Normal file
View File

@@ -0,0 +1,276 @@
"""Webhook server: on push from remote repo, pull and run index --changed."""
from __future__ import annotations
import hmac
import hashlib
import json
import logging
import os
import subprocess
import threading
from pathlib import Path
from fastapi import FastAPI, Request, Response
from fastapi.responses import PlainTextResponse
logger = logging.getLogger(__name__)
# So background webhook logs (pull_and_index, index) appear when run via uvicorn
_rag_log = logging.getLogger("rag_agent")
if not _rag_log.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s: %(name)s: %(message)s"))
_rag_log.setLevel(logging.INFO)
_rag_log.addHandler(_h)
app = FastAPI(title="RAG Agent Webhook", version="0.1.0")
def _branch_from_ref(ref: str) -> str | None:
"""refs/heads/main -> main."""
if not ref or not ref.startswith("refs/heads/"):
return None
return ref.removeprefix("refs/heads/")
# GitHub/GitLab send null SHA as "before" when a branch is first created.
_NULL_SHA = "0000000000000000000000000000000000000000"
def _is_null_sha(sha: str | None) -> bool:
return sha is not None and sha == _NULL_SHA
def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool:
if not secret or not signature_header or not signature_header.startswith("sha256="):
return not secret
expected = hmac.new(
secret.encode("utf-8"), body, digestmod=hashlib.sha256
).hexdigest()
received = signature_header.removeprefix("sha256=").strip()
return hmac.compare_digest(received, expected)
def _decode_stderr(stderr: str | bytes | None) -> str:
if stderr is None:
return ""
return stderr.decode("utf-8", errors="replace") if isinstance(stderr, bytes) else stderr
def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool:
env = os.environ.copy()
env["RAG_REPO_PATH"] = repo_path
try:
proc = subprocess.run(
["rag-agent", "index", "--story", story, "--changed", "--base-ref", base_ref, "--head-ref", head_ref],
env=env,
capture_output=True,
text=True,
timeout=600,
)
if proc.returncode != 0:
logger.error(
"index failed (story=%s base=%s head=%s): stdout=%s stderr=%s",
story, base_ref, head_ref, proc.stdout, proc.stderr,
)
return False
logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref)
return True
except subprocess.TimeoutExpired:
logger.error("index timeout for story=%s", story)
return False
except Exception as e:
logger.exception("index error: %s", e)
return False
def _pull_and_index(
repo_path: str,
branch: str,
*,
payload_before: str | None = None,
payload_after: str | None = None,
) -> None:
"""Fetch, checkout branch; index range from payload (before→after) or from merge result."""
logger.info(
"webhook: pull_and_index started branch=%s repo_path=%s payload_before=%s payload_after=%s",
branch, repo_path, payload_before, payload_after,
)
repo = Path(repo_path)
if not repo.is_dir() or not (repo / ".git").exists():
logger.warning("not a git repo or missing: %s", repo_path)
return
try:
subprocess.run(
["git", "-C", repo_path, "fetch", "origin", branch],
check=True,
capture_output=True,
timeout=60,
)
except subprocess.CalledProcessError as e:
logger.warning("git fetch failed (branch=%s): %s", branch, _decode_stderr(e.stderr))
return
except Exception as e:
logger.exception("git fetch error: %s", e)
return
try:
subprocess.run(
["git", "-C", repo_path, "checkout", branch],
check=True,
capture_output=True,
timeout=10,
)
except subprocess.CalledProcessError as e:
logger.warning("git checkout %s failed: %s", branch, _decode_stderr(e.stderr))
return
# Branch deletion: after is null SHA → nothing to index
if _is_null_sha(payload_after):
logger.info("webhook: branch deletion detected (after is null SHA) for branch=%s; skipping index", branch)
return
# Prefer commit range from webhook payload (GitHub/GitLab before/after) so we index every push
# even when the clone is the same dir as the one that was pushed from (HEAD already at new commit).
if payload_before and payload_after and payload_before != payload_after:
# Update working tree to new commits (fetch only fetches refs; index reads files from disk)
origin_ref = f"origin/{branch}"
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True,
text=True,
timeout=60,
)
if merge_proc.returncode != 0:
logger.warning(
"webhook: git merge --ff-only failed (branch=%s); stderr=%s",
branch, _decode_stderr(merge_proc.stderr),
)
return
# New branch: before is null SHA → use auto (merge-base with default branch)
if _is_null_sha(payload_before):
logger.info(
"webhook: new branch detected (before is null SHA), using --base-ref auto story=%s head=%s",
branch, payload_after,
)
_run_index(repo_path, story=branch, base_ref="auto", head_ref=payload_after)
return
logger.info(
"webhook: running index from payload story=%s %s..%s",
branch, payload_before, payload_after,
)
_run_index(repo_path, story=branch, base_ref=payload_before, head_ref=payload_after)
return
if payload_before and payload_after and payload_before == payload_after:
logger.info("webhook: payload before==after for branch=%s (e.g. force-push); skipping index", branch)
return
# Fallback: no before/after in payload — infer from merge (original behaviour).
origin_ref = f"origin/{branch}"
rev_origin = subprocess.run(
["git", "-C", repo_path, "rev-parse", origin_ref],
capture_output=True,
text=True,
timeout=10,
)
origin_head = (rev_origin.stdout or "").strip() if rev_origin.returncode == 0 else None
if not origin_head:
logger.warning("after fetch: %s not found (wrong branch name?)", origin_ref)
return
try:
old_head = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=10,
)
old_head = (old_head.stdout or "").strip() if old_head.returncode == 0 else None
except Exception as e:
logger.exception("rev-parse HEAD: %s", e)
return
if old_head == origin_head:
logger.info(
"no new commits for branch=%s (already at %s); skipping index",
branch, origin_head,
)
return
try:
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", origin_ref],
capture_output=True,
text=True,
timeout=60,
)
except subprocess.TimeoutExpired:
logger.error("git merge timeout")
return
if merge_proc.returncode != 0:
logger.warning(
"git merge --ff-only failed (branch=%s, non-fast-forward?). stderr=%s Skipping index.",
branch, _decode_stderr(merge_proc.stderr),
)
return
new_head = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=10,
)
new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None
if not old_head or not new_head or old_head == new_head:
logger.info("no new commits for branch=%s (old_head=%s new_head=%s)", branch, old_head, new_head)
return
logger.info("webhook: running index story=%s %s..%s", branch, old_head, new_head)
_run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head)
@app.post("/webhook")
async def webhook(request: Request) -> Response:
"""Handle push webhook from GitHub/GitLab: pull repo and run index --changed."""
body = await request.body()
secret = os.getenv("WEBHOOK_SECRET", "").strip()
sig = request.headers.get("X-Hub-Signature-256")
if secret and not _verify_github_signature(body, secret, sig):
return PlainTextResponse("Invalid signature", status_code=401)
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = None
if not payload or not isinstance(payload, dict):
return PlainTextResponse("Invalid JSON", status_code=400)
ref = payload.get("ref")
branch = _branch_from_ref(ref) if ref else None
if not branch:
return PlainTextResponse("Missing or unsupported ref", status_code=400)
# GitHub/GitLab push: before = previous commit, after = new tip (use for index range)
before = (payload.get("before") or "").strip() or None
after = (payload.get("after") or "").strip() or None
repo_path = os.getenv("RAG_REPO_PATH", "").strip()
if not repo_path:
return PlainTextResponse("RAG_REPO_PATH not set", status_code=500)
threading.Thread(
target=_pull_and_index,
args=(repo_path, branch),
kwargs={"payload_before": before, "payload_after": after},
daemon=True,
).start()
return PlainTextResponse("Accepted", status_code=202)
@app.get("/health")
async def health() -> str:
return "ok"

View File

@@ -1,4 +1,4 @@
from rag_agent.ingest.chunker import chunk_text from rag_agent.ingest.chunker import chunk_text, chunk_text_by_lines
def test_chunk_text_basic(): def test_chunk_text_basic():
@@ -7,3 +7,18 @@ def test_chunk_text_basic():
assert len(chunks) == 3 assert len(chunks) == 3
assert chunks[0].text == "one two three" assert chunks[0].text == "one two three"
assert chunks[1].text.startswith("three four") assert chunks[1].text.startswith("three four")
assert chunks[0].start_line is None
assert chunks[0].end_line is None
def test_chunk_text_by_lines():
text = "line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8"
chunks = chunk_text_by_lines(text, max_lines=3, overlap_lines=1)
assert len(chunks) == 4
assert chunks[0].text == "line1\nline2\nline3"
assert chunks[0].start_line == 1
assert chunks[0].end_line == 3
assert chunks[1].start_line == 3
assert chunks[1].end_line == 5
assert chunks[3].start_line == 7
assert chunks[3].end_line == 8