From 20af12f47dbdceabdf8e2b2dbccfc150da27e7bf Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Sat, 31 Jan 2026 00:32:36 +0300 Subject: [PATCH] =?UTF-8?q?=D0=B3=D0=B8=D1=85=20=D1=85=D1=83=D0=BA=20?= =?UTF-8?q?=D0=B8=20=D1=81=D0=BE=D1=85=D1=80=D0=B0=D0=BD=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D0=B5=20=D0=B8=D0=B7=D0=BC=D0=B5=D0=BD=D0=B5=D0=BD=D0=B8=D0=B9?= =?UTF-8?q?=20=D0=B2=20=D0=BA=D0=BE=D0=BD=D1=82=D0=B5=D0=BA=D1=81=D1=82?= =?UTF-8?q?=D0=B5=20=D1=81=D1=82=D0=BE=D1=80=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 5 +- README.md | 57 +++++- docker-compose.yml | 9 +- docker/postgres-init/01-schema.sql | 13 +- pyproject.toml | 2 + scripts/post-commit | 11 +- scripts/post-receive | 43 ++++ scripts/schema.sql | 13 +- src/rag_agent/cli.py | 101 +++++++++- src/rag_agent/config.py | 4 + src/rag_agent/index/__init__.py | 12 +- src/rag_agent/index/postgres.py | 185 +++++++++++++++++- .../__pycache__/chunker.cpython-312.pyc | Bin 2144 -> 3264 bytes src/rag_agent/ingest/chunker.py | 43 +++- src/rag_agent/ingest/git_watcher.py | 50 +++++ src/rag_agent/webhook.py | 170 ++++++++++++++++ tests/test_chunker.py | 17 +- 17 files changed, 695 insertions(+), 40 deletions(-) create mode 100644 scripts/post-receive create mode 100644 src/rag_agent/webhook.py diff --git a/Dockerfile b/Dockerfile index bd7f961..53d61e6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,9 @@ COPY README.md ./ RUN pip install --no-cache-dir -e . -# Default: run CLI (override in compose or when running) +# Default: run webhook server (override in compose or when running) ENV RAG_DB_DSN="" ENV RAG_REPO_PATH="/data" +EXPOSE 8000 ENTRYPOINT ["rag-agent"] -CMD ["ask", "--help"] +CMD ["serve", "--host", "0.0.0.0", "--port", "8000"] diff --git a/README.md b/README.md index 460ff2f..001265e 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ # RAG Agent (Postgres) Custom RAG agent that indexes text files from a git repository into Postgres -and answers queries using retrieval + LLM generation. Commits are tied to -**stories**; indexing and retrieval can be scoped by story. +and answers queries using retrieval + LLM generation. **Changes are always in the context of a Story**: the unit of work is the story, not individual commits. The agent indexes **all changes from all commits** in the story range (base_ref..head_ref); per-commit indexing is not used. ## Quick start @@ -18,12 +17,29 @@ and answers queries using retrieval + LLM generation. Commits are tied to - `RAG_EMBEDDINGS_DIM` — embedding vector dimension (e.g. `1536`) 3. Create DB schema (only if not using Docker, or if init was disabled): - `python scripts/create_db.py` (or `psql "$RAG_DB_DSN" -f scripts/schema.sql`) -4. Index files for a story (e.g. branch name as story slug): - - `rag-agent index --story my-branch --changed --base-ref HEAD~1 --head-ref HEAD` +4. Index files for a story (e.g. branch name as story slug). Use the **full story range** so all commits in the story are included: + - `rag-agent index --story my-branch --changed --base-ref main --head-ref HEAD` + - Or `--base-ref auto` to use merge-base(default-branch, head-ref) as the start of the story. 5. Ask a question (optionally scoped to a story): - `rag-agent ask "What is covered?"` - `rag-agent ask "What is covered?" --story my-branch` +## Webhook: index on push to remote + +When the app runs as a service in Docker, it can start a **webhook server** so that each push to the remote repository triggers a pull and incremental indexing. + +1. Start the stack with the webhook server (default in Docker): + - `docker compose up -d` — app runs `rag-agent serve` and listens on port 8000. + - Repo is mounted at `RAG_REPO_PATH` (e.g. `/data`) **writable**, so the container can run `git fetch` + `git merge --ff-only` to pull changes. +2. Clone the knowledge-base repo into the mounted directory (once), e.g. on the host: `git clone ./data` so that `./data` is the worktree (or set `RAG_REPO_PATH` to that path and mount it). +3. In GitHub (or GitLab) add a **Webhook**: + - URL: `http://:8000/webhook` (use HTTPS in production and put a reverse proxy in front). + - Content type: `application/json`. + - Secret: set a shared secret and export `WEBHOOK_SECRET` in the app environment (Docker: in `docker-compose.yml` or `.env`). If `WEBHOOK_SECRET` is empty, signature is not checked. +4. On each push to a branch, the server receives the webhook, pulls that branch into the worktree, and runs `rag-agent index --story --changed --base-ref --head-ref ` so only changed files are re-indexed. + +Health check: `GET http://:8000/health` → `ok`. Port is configurable via `WEBHOOK_PORT` (default 8000) in docker-compose. + ## Git hook (index on commit) Install the post-commit hook so changed files are indexed after each commit: @@ -34,11 +50,40 @@ cp scripts/post-commit .git/hooks/post-commit && chmod +x .git/hooks/post-commit Story for the commit is taken from (in order): env `RAG_STORY`, file `.rag-story` in repo root (one line = slug), or current branch name. +## Git hook (server-side) + +Use `scripts/post-receive` in the **bare repo** on the server so that pushes trigger indexing. + +1. On the server, create a **non-bare clone** (worktree) that the hook will update and use for indexing, e.g. `git clone /path/to/repo.git /var/rag-worktree/repo`. +2. In the bare repo, install the hook: `cp /path/to/RagAgent/scripts/post-receive /path/to/repo.git/hooks/post-receive && chmod +x .../post-receive`. +3. Set env for the hook (e.g. in the hook or via systemd/sshd): `RAG_REPO_PATH=/var/rag-worktree/repo`, `RAG_DB_DSN=...`, `RAG_EMBEDDINGS_DIM=...`. Optionally `RAG_AGENT_VENV` (path to venv with `rag-agent`) or `RAG_AGENT_SRC` + `RAG_AGENT_PYTHON` for `python -m rag_agent.cli`. +4. On each push the hook updates the worktree to the new commit, then runs `rag-agent index --changed --base-ref main --head-ref newrev --story ` so the story contains **all commits** on the branch (from main to newrev). + +Story is taken from the ref name (e.g. `refs/heads/main` → `main`). + ## DB structure -- **stories** — story slug (e.g. branch name); documents and chunks are tied to a story. +- **stories** — story slug (e.g. branch name); documents and chunks are tied to a story. Optional: `indexed_base_ref`, `indexed_head_ref`, `indexed_at` record the git range that was indexed (all commits in that range belong to the story). - **documents** — path + version per story; unique `(story_id, path)`. -- **chunks** — text chunks with embeddings (pgvector); updated when documents are re-indexed. +- **chunks** — text chunks with embeddings (pgvector), plus: + - `start_line`, `end_line` — position in the source file (for requirements/use-case files). + - `change_type` — `added` | `modified` | `unchanged` (relative to base ref when indexing with `--changed`). + - `previous_content` — for `modified` chunks, the content before the change (for test-case generation). + +Indexing is **always per-story**: `base_ref..head_ref` defines the set of commits that belong to the story. Use `--base-ref main` (or `auto`) and `--head-ref HEAD` so the story contains all commits on the branch, not a single commit. When you run `index --changed`, the base ref is compared to head; each chunk is marked as added, modified, or unchanged. + +### What changed in a story (for test cases) + +To get only the chunks that were added or modified in a story (e.g. to generate test cases for the changed part): + +```python +from rag_agent.index import fetch_changed_chunks + +changed = fetch_changed_chunks(conn, story_id) +for r in changed: + # r.path, r.content, r.change_type, r.start_line, r.end_line, r.previous_content + ... +``` Scripts: `scripts/create_db.py` (Python, uses `ensure_schema` and `RAG_*` env), `scripts/schema.sql` (raw SQL). diff --git a/docker-compose.yml b/docker-compose.yml index 2f7ff2c..69ac699 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,20 +31,23 @@ services: dockerfile: Dockerfile image: rag-agent:latest container_name: rag-agent - restart: "no" + restart: unless-stopped depends_on: postgres: condition: service_healthy + ports: + - "${WEBHOOK_PORT:-8000}:8000" environment: RAG_DB_DSN: "postgresql://${POSTGRES_USER:-rag}:${POSTGRES_PASSWORD:-rag_secret}@postgres:5432/${POSTGRES_DB:-rag}" RAG_REPO_PATH: ${RAG_REPO_PATH:-/data} RAG_EMBEDDINGS_DIM: ${RAG_EMBEDDINGS_DIM:-1536} GIGACHAT_CREDENTIALS: ${GIGACHAT_CREDENTIALS:-} GIGACHAT_EMBEDDINGS_MODEL: ${GIGACHAT_EMBEDDINGS_MODEL:-Embeddings} + WEBHOOK_SECRET: ${WEBHOOK_SECRET:-} volumes: - - ${RAG_REPO_PATH:-./data}:/data:ro + - ${RAG_REPO_PATH:-./data}:/data entrypoint: ["rag-agent"] - command: ["ask", "--help"] + command: ["serve", "--host", "0.0.0.0", "--port", "8000"] networks: - rag_net diff --git a/docker/postgres-init/01-schema.sql b/docker/postgres-init/01-schema.sql index 7f01f57..0ddc87d 100644 --- a/docker/postgres-init/01-schema.sql +++ b/docker/postgres-init/01-schema.sql @@ -6,7 +6,10 @@ CREATE EXTENSION IF NOT EXISTS vector; CREATE TABLE IF NOT EXISTS stories ( id SERIAL PRIMARY KEY, slug TEXT UNIQUE NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + indexed_base_ref TEXT, + indexed_head_ref TEXT, + indexed_at TIMESTAMPTZ ); CREATE TABLE IF NOT EXISTS documents ( @@ -24,9 +27,15 @@ CREATE TABLE IF NOT EXISTS chunks ( chunk_index INTEGER NOT NULL, hash TEXT NOT NULL, content TEXT NOT NULL, - embedding vector(1536) NOT NULL + embedding vector(1536) NOT NULL, + start_line INTEGER, + end_line INTEGER, + change_type TEXT NOT NULL DEFAULT 'added' + CHECK (change_type IN ('added', 'modified', 'unchanged')), + previous_content TEXT ); CREATE INDEX IF NOT EXISTS idx_documents_story_id ON documents(story_id); CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks(document_id); CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops); +CREATE INDEX IF NOT EXISTS idx_chunks_change_type ON chunks(change_type); diff --git a/pyproject.toml b/pyproject.toml index 764e13d..9e466d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,8 @@ dependencies = [ "pydantic>=2.7.0", "python-dotenv>=1.0.0", "gigachat>=0.2.0", + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", ] [project.scripts] diff --git a/scripts/post-commit b/scripts/post-commit index 3c939cf..4097f8d 100644 --- a/scripts/post-commit +++ b/scripts/post-commit @@ -20,16 +20,15 @@ if [ -z "$STORY" ]; then exit 0 fi -# Run index (changed files only: previous commit -> HEAD) +# Run index: all changes in the story (main..HEAD), not per-commit if command -v rag-agent >/dev/null 2>&1; then - rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" + rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY" elif [ -n "${VIRTUAL_ENV}" ]; then - rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" + rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY" else - # Try repo venv or python -m if [ -f "venv/bin/rag-agent" ]; then - venv/bin/rag-agent index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" + venv/bin/rag-agent index --changed --base-ref main --head-ref HEAD --story "$STORY" else - PYTHONPATH=src python -m rag_agent.cli index --changed --base-ref HEAD~1 --head-ref HEAD --story "$STORY" 2>/dev/null || true + PYTHONPATH=src python -m rag_agent.cli index --changed --base-ref main --head-ref HEAD --story "$STORY" 2>/dev/null || true fi fi diff --git a/scripts/post-receive b/scripts/post-receive new file mode 100644 index 0000000..d80c04a --- /dev/null +++ b/scripts/post-receive @@ -0,0 +1,43 @@ +#!/usr/bin/env sh +# Server-side hook: index changed files after push. Install in bare repo: cp scripts/post-receive /path/to/repo.git/hooks/post-receive +# Requires: RAG_REPO_PATH = path to a non-bare clone of this repo (worktree), updated by this hook to newrev. +# RAG_DB_DSN, RAG_EMBEDDINGS_DIM; optionally GIGACHAT_CREDENTIALS. +# Story is derived from ref name (e.g. refs/heads/main -> main). + +set -e + +if [ -z "${RAG_REPO_PATH}" ]; then + echo "post-receive: RAG_REPO_PATH (path to clone worktree) is required. Skipping index." + exit 0 +fi + +# Read refs from stdin (refname SP oldrev SP newrev LF) +while read -r refname oldrev newrev; do + # Skip branch deletions + if [ "$newrev" = "0000000000000000000000000000000000000000" ]; then + continue + fi + # Branch name from ref (e.g. refs/heads/main -> main) + branch="${refname#refs/heads/}" + [ -z "$branch" ] && continue + + # Update worktree to newrev so rag-agent can read files from disk + if [ -d "${RAG_REPO_PATH}/.git" ]; then + git -C "${RAG_REPO_PATH}" fetch origin 2>/dev/null || true + git -C "${RAG_REPO_PATH}" checkout -f "${newrev}" 2>/dev/null || true + fi + + # Index all commits in the story (main..newrev), not just this push + export RAG_REPO_PATH + if command -v rag-agent >/dev/null 2>&1; then + rag-agent index --changed --base-ref main --head-ref "${newrev}" --story "${branch}" + elif [ -f "${RAG_AGENT_VENV}/bin/rag-agent" ]; then + "${RAG_AGENT_VENV}/bin/rag-agent" index --changed --base-ref main --head-ref "${newrev}" --story "${branch}" + else + if [ -n "${RAG_AGENT_SRC}" ]; then + PYTHONPATH="${RAG_AGENT_SRC}/src" "${RAG_AGENT_PYTHON:-python3}" -m rag_agent.cli index --changed --base-ref main --head-ref "${newrev}" --story "${branch}" 2>/dev/null || true + fi + fi +done + +exit 0 diff --git a/scripts/schema.sql b/scripts/schema.sql index f31463a..f0bcf75 100644 --- a/scripts/schema.sql +++ b/scripts/schema.sql @@ -7,7 +7,10 @@ CREATE EXTENSION IF NOT EXISTS vector; CREATE TABLE IF NOT EXISTS stories ( id SERIAL PRIMARY KEY, slug TEXT UNIQUE NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + indexed_base_ref TEXT, + indexed_head_ref TEXT, + indexed_at TIMESTAMPTZ ); CREATE TABLE IF NOT EXISTS documents ( @@ -25,9 +28,15 @@ CREATE TABLE IF NOT EXISTS chunks ( chunk_index INTEGER NOT NULL, hash TEXT NOT NULL, content TEXT NOT NULL, - embedding vector(1536) NOT NULL + embedding vector(1536) NOT NULL, + start_line INTEGER, + end_line INTEGER, + change_type TEXT NOT NULL DEFAULT 'added' + CHECK (change_type IN ('added', 'modified', 'unchanged')), + previous_content TEXT ); CREATE INDEX IF NOT EXISTS idx_documents_story_id ON documents(story_id); CREATE INDEX IF NOT EXISTS idx_chunks_document_id ON chunks(document_id); CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON chunks USING ivfflat (embedding vector_cosine_ops); +CREATE INDEX IF NOT EXISTS idx_chunks_change_type ON chunks(change_type); diff --git a/src/rag_agent/cli.py b/src/rag_agent/cli.py index 772cde5..3c8af6b 100644 --- a/src/rag_agent/cli.py +++ b/src/rag_agent/cli.py @@ -5,9 +5,15 @@ import hashlib from pathlib import Path from rag_agent.config import load_config -from rag_agent.ingest.chunker import chunk_text +from rag_agent.ingest.chunker import chunk_text_by_lines from rag_agent.ingest.file_loader import iter_text_files -from rag_agent.ingest.git_watcher import filter_existing, filter_removed, get_changed_files +from rag_agent.ingest.git_watcher import ( + filter_existing, + filter_removed, + get_changed_files, + get_merge_base, + read_file_at_ref, +) from rag_agent.index.embeddings import get_embedding_client from rag_agent.index.postgres import ( connect, @@ -16,6 +22,7 @@ from rag_agent.index.postgres import ( get_or_create_story, get_story_id, replace_chunks, + update_story_indexed_range, upsert_document, ) from rag_agent.agent.pipeline import StubLLMClient, answer_query @@ -34,26 +41,66 @@ def cmd_index(args: argparse.Namespace) -> None: story_id = get_or_create_story(conn, args.story) embedding_client = get_embedding_client(config.embeddings_dim) + base_ref = args.base_ref.strip() + head_ref = args.head_ref.strip() if args.changed: - changed_files = get_changed_files(config.repo_path, args.base_ref, args.head_ref) + if base_ref.lower() == "auto": + base_ref = get_merge_base( + config.repo_path, args.default_branch, head_ref + ) or args.default_branch + changed_files = get_changed_files( + config.repo_path, base_ref, head_ref + ) removed = filter_removed(changed_files) existing = filter_existing(changed_files) else: removed = [] - existing = [path for path in Path(config.repo_path).rglob("*") if path.is_file()] + existing = [ + p for p in Path(config.repo_path).rglob("*") if p.is_file() + ] for path in removed: delete_document(conn, story_id, str(path)) for path, text in iter_text_files(existing, config.allowed_extensions): - chunks = chunk_text(text, config.chunk_size, config.chunk_overlap) + chunks = chunk_text_by_lines( + text, + config.chunk_size_lines, + config.chunk_overlap_lines, + ) if not chunks: continue - embeddings = embedding_client.embed_texts([chunk.text for chunk in chunks]) + base_chunks = None + if args.changed: + base_text = read_file_at_ref(config.repo_path, path, base_ref) + if base_text is not None: + base_chunks = chunk_text_by_lines( + base_text, + config.chunk_size_lines, + config.chunk_overlap_lines, + ) + embeddings = embedding_client.embed_texts( + [chunk.text for chunk in chunks] + ) document_id = upsert_document( conn, story_id, str(path), _file_version(path) ) - replace_chunks(conn, document_id, chunks, embeddings) + replace_chunks( + conn, document_id, chunks, embeddings, base_chunks=base_chunks + ) + + if args.changed: + update_story_indexed_range(conn, story_id, base_ref, head_ref) + + +def cmd_serve(args: argparse.Namespace) -> None: + import uvicorn + uvicorn.run( + "rag_agent.webhook:app", + host=args.host, + port=args.port, + log_level="info", + ) def cmd_ask(args: argparse.Namespace) -> None: @@ -89,9 +136,26 @@ def build_parser() -> argparse.ArgumentParser: required=True, help="Story slug (e.g. branch name or story id); documents are tied to this story", ) - index_parser.add_argument("--changed", action="store_true", help="Index only changed files") - index_parser.add_argument("--base-ref", default="HEAD~1", help="Base git ref") - index_parser.add_argument("--head-ref", default="HEAD", help="Head git ref") + index_parser.add_argument( + "--changed", + action="store_true", + help="Index only files changed in the story range (base-ref..head-ref); all commits in range belong to the story", + ) + index_parser.add_argument( + "--base-ref", + default="main", + help="Start of story range (e.g. main). Use 'auto' for merge-base(default-branch, head-ref). All commits from base to head are the story.", + ) + index_parser.add_argument( + "--head-ref", + default="HEAD", + help="End of story range (e.g. current branch tip)", + ) + index_parser.add_argument( + "--default-branch", + default="main", + help="Default branch name for --base-ref auto", + ) index_parser.set_defaults(func=cmd_index) ask_parser = sub.add_parser("ask", help="Ask a question") @@ -104,6 +168,23 @@ def build_parser() -> argparse.ArgumentParser: ask_parser.add_argument("--top-k", type=int, default=5, help="Top K chunks to retrieve") ask_parser.set_defaults(func=cmd_ask) + serve_parser = sub.add_parser( + "serve", + help="Run webhook server: on push to remote repo, pull and index changes", + ) + serve_parser.add_argument( + "--host", + default="0.0.0.0", + help="Bind host (default: 0.0.0.0)", + ) + serve_parser.add_argument( + "--port", + type=int, + default=8000, + help="Bind port (default: 8000)", + ) + serve_parser.set_defaults(func=cmd_serve) + return parser diff --git a/src/rag_agent/config.py b/src/rag_agent/config.py index e7025d4..99faafb 100644 --- a/src/rag_agent/config.py +++ b/src/rag_agent/config.py @@ -18,6 +18,8 @@ class AppConfig: db_dsn: str chunk_size: int chunk_overlap: int + chunk_size_lines: int + chunk_overlap_lines: int embeddings_dim: int embeddings_model: str llm_model: str @@ -55,6 +57,8 @@ def load_config() -> AppConfig: db_dsn=db_dsn, chunk_size=_env_int("RAG_CHUNK_SIZE", 400), chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50), + chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40), + chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8), embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1536), embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"), llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"), diff --git a/src/rag_agent/index/__init__.py b/src/rag_agent/index/__init__.py index a9a2c5b..8f495ba 100644 --- a/src/rag_agent/index/__init__.py +++ b/src/rag_agent/index/__init__.py @@ -1 +1,11 @@ -__all__ = [] +from rag_agent.index.postgres import ( + ChangedChunkRecord, + fetch_changed_chunks, + get_story_indexed_range, +) + +__all__ = [ + "ChangedChunkRecord", + "fetch_changed_chunks", + "get_story_indexed_range", +] diff --git a/src/rag_agent/index/postgres.py b/src/rag_agent/index/postgres.py index 1aa2f3c..5d67cc9 100644 --- a/src/rag_agent/index/postgres.py +++ b/src/rag_agent/index/postgres.py @@ -2,13 +2,17 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone -from typing import Iterable +from typing import Iterable, Sequence import psycopg from pgvector.psycopg import register_vector from rag_agent.ingest.chunker import TextChunk +CHANGE_ADDED = "added" +CHANGE_MODIFIED = "modified" +CHANGE_UNCHANGED = "unchanged" + @dataclass(frozen=True) class ChunkRecord: @@ -18,6 +22,18 @@ class ChunkRecord: embedding: list[float] +@dataclass(frozen=True) +class ChangedChunkRecord: + """Chunk that was added or modified in a story (for test-case generation).""" + + path: str + content: str + change_type: str + start_line: int | None + end_line: int | None + previous_content: str | None + + def connect(dsn: str) -> psycopg.Connection: conn = psycopg.connect(dsn) register_vector(conn) @@ -32,10 +48,22 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None: CREATE TABLE IF NOT EXISTS stories ( id SERIAL PRIMARY KEY, slug TEXT UNIQUE NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + indexed_base_ref TEXT, + indexed_head_ref TEXT, + indexed_at TIMESTAMPTZ ); """ ) + for col_def in ( + "ADD COLUMN IF NOT EXISTS indexed_base_ref TEXT", + "ADD COLUMN IF NOT EXISTS indexed_head_ref TEXT", + "ADD COLUMN IF NOT EXISTS indexed_at TIMESTAMPTZ", + ): + try: + cur.execute(f"ALTER TABLE stories {col_def};") + except psycopg.ProgrammingError: + pass cur.execute( """ CREATE TABLE IF NOT EXISTS documents ( @@ -56,10 +84,41 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None: chunk_index INTEGER NOT NULL, hash TEXT NOT NULL, content TEXT NOT NULL, - embedding vector({embeddings_dim}) NOT NULL + embedding vector({embeddings_dim}) NOT NULL, + start_line INTEGER, + end_line INTEGER, + change_type TEXT NOT NULL DEFAULT 'added' + CHECK (change_type IN ('added', 'modified', 'unchanged')), + previous_content TEXT ); """ ) + # Migrations: add columns if table already existed without them (Postgres 11+) + for col_def in ( + "ADD COLUMN IF NOT EXISTS start_line INTEGER", + "ADD COLUMN IF NOT EXISTS end_line INTEGER", + "ADD COLUMN IF NOT EXISTS previous_content TEXT", + "ADD COLUMN IF NOT EXISTS change_type TEXT DEFAULT 'added'", + ): + try: + cur.execute(f"ALTER TABLE chunks {col_def};") + except psycopg.ProgrammingError: + pass + try: + cur.execute( + "ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;" + ) + except psycopg.ProgrammingError: + pass + try: + cur.execute( + """ + ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check + CHECK (change_type IN ('added', 'modified', 'unchanged')); + """ + ) + except psycopg.ProgrammingError: + pass # constraint may already exist cur.execute( """ CREATE INDEX IF NOT EXISTS idx_documents_story_id @@ -78,6 +137,12 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None: ON chunks USING ivfflat (embedding vector_cosine_ops); """ ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_chunks_change_type + ON chunks(change_type); + """ + ) conn.commit() @@ -97,6 +162,44 @@ def get_or_create_story(conn: psycopg.Connection, slug: str) -> int: return story_id +def update_story_indexed_range( + conn: psycopg.Connection, + story_id: int, + base_ref: str, + head_ref: str, +) -> None: + """Record that this story was indexed as all changes from base_ref to head_ref (all commits in story).""" + with conn.cursor() as cur: + cur.execute( + """ + UPDATE stories + SET indexed_base_ref = %s, indexed_head_ref = %s, + indexed_at = (NOW() AT TIME ZONE 'utc') + WHERE id = %s; + """, + (base_ref.strip(), head_ref.strip(), story_id), + ) + conn.commit() + + +def get_story_indexed_range( + conn: psycopg.Connection, story_id: int +) -> tuple[str | None, str | None, datetime | None]: + """Return (indexed_base_ref, indexed_head_ref, indexed_at) for the story, or (None, None, None).""" + with conn.cursor() as cur: + cur.execute( + """ + SELECT indexed_base_ref, indexed_head_ref, indexed_at + FROM stories WHERE id = %s; + """, + (story_id,), + ) + row = cur.fetchone() + if row is None: + return (None, None, None) + return (row[0], row[1], row[2]) + + def get_story_id(conn: psycopg.Connection, slug: str) -> int | None: s = slug.strip() with conn.cursor() as cur: @@ -127,28 +230,98 @@ def upsert_document( return document_id +def _change_type_and_previous( + chunk: TextChunk, + base_by_range: dict[tuple[int, int], TextChunk], +) -> tuple[str, str | None]: + """Determine change_type and previous_content for a chunk given base chunks keyed by (start_line, end_line).""" + if chunk.start_line is None or chunk.end_line is None: + return (CHANGE_ADDED, None) + key = (chunk.start_line, chunk.end_line) + base = base_by_range.get(key) + if base is None: + return (CHANGE_ADDED, None) + if base.hash == chunk.hash: + return (CHANGE_UNCHANGED, None) + return (CHANGE_MODIFIED, base.text) + + def replace_chunks( conn: psycopg.Connection, document_id: int, chunks: Iterable[TextChunk], embeddings: Iterable[list[float]], + base_chunks: Sequence[TextChunk] | None = None, ) -> None: + base_by_range: dict[tuple[int, int], TextChunk] = {} + if base_chunks: + for c in base_chunks: + if c.start_line is not None and c.end_line is not None: + base_by_range[(c.start_line, c.end_line)] = c + with conn.cursor() as cur: cur.execute( "DELETE FROM chunks WHERE document_id = %s;", (document_id,), ) for chunk, embedding in zip(chunks, embeddings): + change_type, previous_content = _change_type_and_previous( + chunk, base_by_range + ) cur.execute( """ - INSERT INTO chunks (document_id, chunk_index, hash, content, embedding) - VALUES (%s, %s, %s, %s, %s); + INSERT INTO chunks ( + document_id, chunk_index, hash, content, embedding, + start_line, end_line, change_type, previous_content + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); """, - (document_id, chunk.index, chunk.hash, chunk.text, embedding), + ( + document_id, + chunk.index, + chunk.hash, + chunk.text, + embedding, + chunk.start_line, + chunk.end_line, + change_type, + previous_content, + ), ) conn.commit() +def fetch_changed_chunks( + conn: psycopg.Connection, story_id: int +) -> list[ChangedChunkRecord]: + """Return chunks that were added or modified in this story (for test-case generation).""" + with conn.cursor() as cur: + cur.execute( + """ + SELECT d.path, c.content, c.change_type, c.start_line, c.end_line, + c.previous_content + FROM chunks c + JOIN documents d ON d.id = c.document_id + WHERE d.story_id = %s + AND c.change_type IN ('added', 'modified') + ORDER BY d.path, c.start_line NULLS FIRST, c.chunk_index; + """, + (story_id,), + ) + rows = cur.fetchall() + return [ + ChangedChunkRecord( + path=row[0], + content=row[1], + change_type=row[2], + start_line=row[3], + end_line=row[4], + previous_content=row[5], + ) + for row in rows + ] + + def delete_document( conn: psycopg.Connection, story_id: int, path: str ) -> None: diff --git a/src/rag_agent/ingest/__pycache__/chunker.cpython-312.pyc b/src/rag_agent/ingest/__pycache__/chunker.cpython-312.pyc index 15f125adaa14a3633145fbefc64ffb6e319389ca..b2808e736449cdc26b59c0b460be44a2cbf6e502 100644 GIT binary patch delta 1551 zcmZ`(O-vg{6yEjzcz?W$?PAAZ3`_alf&oWuCZ~=ujf7%w$&KDyWKZ8sTksYc1AQ*LxNcnsaJnSlOcm$e$^^c zM$ia=J+SK2L)*_jp|4<-PHF}}!y0~*F@$lr2{g{tM53+ox`Zcay?af{E!mo*U0Yr< zYm#MWke$xwKo-m;!Bho!WYz`tZrVdO3LPx2Z_$&)v{D! zJFD@I;+4|kuWSd3;(+hR=o#>@N$G`*wUD;Wdp1gdA%y?(4aTD&Wf_3R5J3Q)KAOkF z%uTw4?=xk=pbY9Mv&Z2W`+0xKVQ)}6cMbW#^@P^G@c0cf3ov& zFM#YqVd=IPz%aN);BWneKjqB6gFvJjjaMT*2a%3_ZKM)O!8cS}62~&HsK<;TO7$=$ zs`w8s8SS9t;8$Jcc|1^&5_OFVgp1Sgo$dVW2jounN1@ zIeeC#*x;T250U=zfmgP*sYy}>ZEjV|E-zVTwqWP(ni`>5z82rJTa ztKPBbJUEV$9lrq-sk%TZp@-MEu2%wy;^p7vApVSN-7xVxzesHdYN*_^qrV7ueS70c z=ZvI@i|cgk(mK^kXzqIwsssm0T)oq40r0FyZS}WLTL8WT z|IPP)=<=TA!6UybT>$l@DW9|K*_)6CmjbtR0j0pcrc6OEWajebtYxEHUf;aFA20Yj zE(%_c2gupy2)JG(i=ISoLF9$g6?ksa^98$r%yb%&zCkVcYkzW-ygZ2XsoQjJ6%cs{ z5P68)a-9g{cb1Jp-n@t|fsOQ;wTVysT02K;bqe5!p?tBrk4Qn99zPOdbhv&Fzb~{8 iA8{=GZe4Ao!|*2788Wvfym=_)&WBv{F$Z?{!hZqe>TMkW delta 656 zcmZuuL2DC16yDjHO*XsPO{_Id8!Q2FWocrhq!grjP*5W1#Uf~oOXF@#Fj}15pp~M9 z*5WbY)w_2Ee}I>syx2pfJ%oagpRiC5wU_#4_oO(>y!ZB-_ulu-yzR+R*8FZ5nuP1c z(Q2p224E`;ap=PYt&}`KBIIPLva%b@DKy%tle%lGWR{<{|kb#G^#Ktj# zH_04n!iVH#_(`Y8CLGZ|`%FJk=(Cs8o2%@uWGi>LjH$<@Xj86Z#0_Cgs3;Xu07q#R zVwT74-t(t1O9?Yhk==I!TN`|6XSYCtzp&SfM$yLGPL zf|)(-Y?zhOV{5lj{g*`jifCiA14StJ`FTNW2_ppf3{4RikJ$B2;BhCC;PVO|S#UEs z55G0*ZB!>F5En<$CH#Gz>2~~}zKG4Q3+WA8_tQ;$jrhHICJVXUlG{>s{ql#0YNx z5zM8FHz+Yf3C$su str: @@ -27,7 +29,15 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> list[TextChunk]: while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk_text_value = " ".join(tokens[start:end]) - chunks.append(TextChunk(index=index, text=chunk_text_value, hash=_hash_text(chunk_text_value))) + chunks.append( + TextChunk( + index=index, + text=chunk_text_value, + hash=_hash_text(chunk_text_value), + start_line=None, + end_line=None, + ) + ) index += 1 if end == len(tokens): break @@ -40,3 +50,34 @@ def iter_chunks( ) -> Iterator[list[TextChunk]]: for text in texts: yield chunk_text(text, chunk_size, overlap) + + +def chunk_text_by_lines( + text: str, max_lines: int, overlap_lines: int +) -> list[TextChunk]: + """Chunk by consecutive lines; each chunk has start_line/end_line (1-based).""" + lines = text.splitlines() + if not lines: + return [] + + chunks: list[TextChunk] = [] + index = 0 + start = 0 + while start < len(lines): + end = min(start + max_lines, len(lines)) + chunk_lines = lines[start:end] + chunk_text_value = "\n".join(chunk_lines) + chunks.append( + TextChunk( + index=index, + text=chunk_text_value, + hash=_hash_text(chunk_text_value), + start_line=start + 1, + end_line=end, + ) + ) + index += 1 + if end == len(lines): + break + start = max(end - overlap_lines, 0) + return chunks diff --git a/src/rag_agent/ingest/git_watcher.py b/src/rag_agent/ingest/git_watcher.py index 2e82a96..920899e 100644 --- a/src/rag_agent/ingest/git_watcher.py +++ b/src/rag_agent/ingest/git_watcher.py @@ -40,3 +40,53 @@ def filter_existing(paths: Iterable[Path]) -> list[Path]: def filter_removed(paths: Iterable[Path]) -> list[Path]: return [path for path in paths if not path.exists()] + + +def get_merge_base( + repo_path: str, ref1: str, ref2: str = "HEAD" +) -> str | None: + """Return merge-base commit of ref1 and ref2 (start of story range). None on error.""" + args = [ + "git", + "-C", + repo_path, + "merge-base", + ref1, + ref2, + ] + try: + result = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + ) + return result.stdout.strip() or None + except subprocess.CalledProcessError: + return None + + +def read_file_at_ref( + repo_path: str, path: Path, ref: str +) -> str | None: + """Read file content at a git ref. Returns None if path is missing at ref.""" + repo = Path(repo_path) + rel = path.relative_to(repo) if path.is_absolute() else path + rel_str = rel.as_posix() + args = [ + "git", + "-C", + repo_path, + "show", + f"{ref}:{rel_str}", + ] + try: + result = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + ) + return result.stdout + except subprocess.CalledProcessError: + return None diff --git a/src/rag_agent/webhook.py b/src/rag_agent/webhook.py new file mode 100644 index 0000000..a8a7c78 --- /dev/null +++ b/src/rag_agent/webhook.py @@ -0,0 +1,170 @@ +"""Webhook server: on push from remote repo, pull and run index --changed.""" + +from __future__ import annotations + +import hmac +import hashlib +import json +import logging +import os +import subprocess +import threading +from pathlib import Path + +from fastapi import FastAPI, Request, Response +from fastapi.responses import PlainTextResponse + +logger = logging.getLogger(__name__) + +app = FastAPI(title="RAG Agent Webhook", version="0.1.0") + + +def _branch_from_ref(ref: str) -> str | None: + """refs/heads/main -> main.""" + if not ref or not ref.startswith("refs/heads/"): + return None + return ref.removeprefix("refs/heads/") + + +def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool: + if not secret or not signature_header or not signature_header.startswith("sha256="): + return not secret + expected = hmac.new( + secret.encode("utf-8"), body, digestmod=hashlib.sha256 + ).hexdigest() + received = signature_header.removeprefix("sha256=").strip() + return hmac.compare_digest(received, expected) + + +def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool: + env = os.environ.copy() + env["RAG_REPO_PATH"] = repo_path + try: + proc = subprocess.run( + ["rag-agent", "index", "--story", story, "--changed", "--base-ref", base_ref, "--head-ref", head_ref], + env=env, + capture_output=True, + text=True, + timeout=600, + ) + if proc.returncode != 0: + logger.error("index failed: %s %s", proc.stdout, proc.stderr) + return False + logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref) + return True + except subprocess.TimeoutExpired: + logger.error("index timeout for story=%s", story) + return False + except Exception as e: + logger.exception("index error: %s", e) + return False + + +def _pull_and_index(repo_path: str, branch: str) -> None: + repo = Path(repo_path) + if not repo.is_dir() or not (repo / ".git").exists(): + logger.warning("not a git repo or missing: %s", repo_path) + return + try: + subprocess.run( + ["git", "-C", repo_path, "fetch", "origin", branch], + check=True, + capture_output=True, + timeout=60, + ) + except subprocess.CalledProcessError as e: + logger.warning("git fetch failed: %s", e.stderr) + return + except Exception as e: + logger.exception("git fetch error: %s", e) + return + + try: + subprocess.run( + ["git", "-C", repo_path, "checkout", branch], + check=True, + capture_output=True, + timeout=10, + ) + except subprocess.CalledProcessError as e: + logger.warning("git checkout %s failed: %s", branch, e.stderr) + return + + try: + old_head = subprocess.run( + ["git", "-C", repo_path, "rev-parse", "HEAD"], + capture_output=True, + text=True, + timeout=10, + ) + old_head = (old_head.stdout or "").strip() if old_head.returncode == 0 else None + except Exception as e: + logger.exception("rev-parse HEAD: %s", e) + return + + try: + merge_proc = subprocess.run( + ["git", "-C", repo_path, "merge", "--ff-only", f"origin/{branch}"], + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + logger.error("git merge timeout") + return + if merge_proc.returncode != 0: + logger.warning("git merge --ff-only failed (non-fast-forward?). Skipping index.") + return + + new_head = subprocess.run( + ["git", "-C", repo_path, "rev-parse", "HEAD"], + capture_output=True, + text=True, + timeout=10, + ) + new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None + if not old_head or not new_head or old_head == new_head: + logger.info("no new commits for branch=%s", branch) + return + + _run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head) + + +@app.post("/webhook") +async def webhook(request: Request) -> Response: + """Handle push webhook from GitHub/GitLab: pull repo and run index --changed.""" + body = await request.body() + secret = os.getenv("WEBHOOK_SECRET", "").strip() + sig = request.headers.get("X-Hub-Signature-256") + + if secret and not _verify_github_signature(body, secret, sig): + return PlainTextResponse("Invalid signature", status_code=401) + + try: + payload = json.loads(body.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError): + payload = None + if not payload or not isinstance(payload, dict): + return PlainTextResponse("Invalid JSON", status_code=400) + + ref = payload.get("ref") + branch = _branch_from_ref(ref) if ref else None + if not branch: + return PlainTextResponse("Missing or unsupported ref", status_code=400) + + repo_path = os.getenv("RAG_REPO_PATH", "").strip() + if not repo_path: + return PlainTextResponse("RAG_REPO_PATH not set", status_code=500) + + threading.Thread( + target=_pull_and_index, + args=(repo_path, branch), + daemon=True, + ).start() + + return PlainTextResponse("Accepted", status_code=202) + + +@app.get("/health") +async def health() -> str: + return "ok" diff --git a/tests/test_chunker.py b/tests/test_chunker.py index b13a734..b457ed9 100644 --- a/tests/test_chunker.py +++ b/tests/test_chunker.py @@ -1,4 +1,4 @@ -from rag_agent.ingest.chunker import chunk_text +from rag_agent.ingest.chunker import chunk_text, chunk_text_by_lines def test_chunk_text_basic(): @@ -7,3 +7,18 @@ def test_chunk_text_basic(): assert len(chunks) == 3 assert chunks[0].text == "one two three" assert chunks[1].text.startswith("three four") + assert chunks[0].start_line is None + assert chunks[0].end_line is None + + +def test_chunk_text_by_lines(): + text = "line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8" + chunks = chunk_text_by_lines(text, max_lines=3, overlap_lines=1) + assert len(chunks) == 4 + assert chunks[0].text == "line1\nline2\nline3" + assert chunks[0].start_line == 1 + assert chunks[0].end_line == 3 + assert chunks[1].start_line == 3 + assert chunks[1].end_line == 5 + assert chunks[3].start_line == 7 + assert chunks[3].end_line == 8