гих хук и сохранение изменений в контексте стори

This commit is contained in:
2026-01-31 00:32:36 +03:00
parent 5ce6335ad8
commit 20af12f47d
17 changed files with 695 additions and 40 deletions

View File

@@ -5,9 +5,15 @@ import hashlib
from pathlib import Path
from rag_agent.config import load_config
from rag_agent.ingest.chunker import chunk_text
from rag_agent.ingest.chunker import chunk_text_by_lines
from rag_agent.ingest.file_loader import iter_text_files
from rag_agent.ingest.git_watcher import filter_existing, filter_removed, get_changed_files
from rag_agent.ingest.git_watcher import (
filter_existing,
filter_removed,
get_changed_files,
get_merge_base,
read_file_at_ref,
)
from rag_agent.index.embeddings import get_embedding_client
from rag_agent.index.postgres import (
connect,
@@ -16,6 +22,7 @@ from rag_agent.index.postgres import (
get_or_create_story,
get_story_id,
replace_chunks,
update_story_indexed_range,
upsert_document,
)
from rag_agent.agent.pipeline import StubLLMClient, answer_query
@@ -34,26 +41,66 @@ def cmd_index(args: argparse.Namespace) -> None:
story_id = get_or_create_story(conn, args.story)
embedding_client = get_embedding_client(config.embeddings_dim)
base_ref = args.base_ref.strip()
head_ref = args.head_ref.strip()
if args.changed:
changed_files = get_changed_files(config.repo_path, args.base_ref, args.head_ref)
if base_ref.lower() == "auto":
base_ref = get_merge_base(
config.repo_path, args.default_branch, head_ref
) or args.default_branch
changed_files = get_changed_files(
config.repo_path, base_ref, head_ref
)
removed = filter_removed(changed_files)
existing = filter_existing(changed_files)
else:
removed = []
existing = [path for path in Path(config.repo_path).rglob("*") if path.is_file()]
existing = [
p for p in Path(config.repo_path).rglob("*") if p.is_file()
]
for path in removed:
delete_document(conn, story_id, str(path))
for path, text in iter_text_files(existing, config.allowed_extensions):
chunks = chunk_text(text, config.chunk_size, config.chunk_overlap)
chunks = chunk_text_by_lines(
text,
config.chunk_size_lines,
config.chunk_overlap_lines,
)
if not chunks:
continue
embeddings = embedding_client.embed_texts([chunk.text for chunk in chunks])
base_chunks = None
if args.changed:
base_text = read_file_at_ref(config.repo_path, path, base_ref)
if base_text is not None:
base_chunks = chunk_text_by_lines(
base_text,
config.chunk_size_lines,
config.chunk_overlap_lines,
)
embeddings = embedding_client.embed_texts(
[chunk.text for chunk in chunks]
)
document_id = upsert_document(
conn, story_id, str(path), _file_version(path)
)
replace_chunks(conn, document_id, chunks, embeddings)
replace_chunks(
conn, document_id, chunks, embeddings, base_chunks=base_chunks
)
if args.changed:
update_story_indexed_range(conn, story_id, base_ref, head_ref)
def cmd_serve(args: argparse.Namespace) -> None:
import uvicorn
uvicorn.run(
"rag_agent.webhook:app",
host=args.host,
port=args.port,
log_level="info",
)
def cmd_ask(args: argparse.Namespace) -> None:
@@ -89,9 +136,26 @@ def build_parser() -> argparse.ArgumentParser:
required=True,
help="Story slug (e.g. branch name or story id); documents are tied to this story",
)
index_parser.add_argument("--changed", action="store_true", help="Index only changed files")
index_parser.add_argument("--base-ref", default="HEAD~1", help="Base git ref")
index_parser.add_argument("--head-ref", default="HEAD", help="Head git ref")
index_parser.add_argument(
"--changed",
action="store_true",
help="Index only files changed in the story range (base-ref..head-ref); all commits in range belong to the story",
)
index_parser.add_argument(
"--base-ref",
default="main",
help="Start of story range (e.g. main). Use 'auto' for merge-base(default-branch, head-ref). All commits from base to head are the story.",
)
index_parser.add_argument(
"--head-ref",
default="HEAD",
help="End of story range (e.g. current branch tip)",
)
index_parser.add_argument(
"--default-branch",
default="main",
help="Default branch name for --base-ref auto",
)
index_parser.set_defaults(func=cmd_index)
ask_parser = sub.add_parser("ask", help="Ask a question")
@@ -104,6 +168,23 @@ def build_parser() -> argparse.ArgumentParser:
ask_parser.add_argument("--top-k", type=int, default=5, help="Top K chunks to retrieve")
ask_parser.set_defaults(func=cmd_ask)
serve_parser = sub.add_parser(
"serve",
help="Run webhook server: on push to remote repo, pull and index changes",
)
serve_parser.add_argument(
"--host",
default="0.0.0.0",
help="Bind host (default: 0.0.0.0)",
)
serve_parser.add_argument(
"--port",
type=int,
default=8000,
help="Bind port (default: 8000)",
)
serve_parser.set_defaults(func=cmd_serve)
return parser

View File

@@ -18,6 +18,8 @@ class AppConfig:
db_dsn: str
chunk_size: int
chunk_overlap: int
chunk_size_lines: int
chunk_overlap_lines: int
embeddings_dim: int
embeddings_model: str
llm_model: str
@@ -55,6 +57,8 @@ def load_config() -> AppConfig:
db_dsn=db_dsn,
chunk_size=_env_int("RAG_CHUNK_SIZE", 400),
chunk_overlap=_env_int("RAG_CHUNK_OVERLAP", 50),
chunk_size_lines=_env_int("RAG_CHUNK_SIZE_LINES", 40),
chunk_overlap_lines=_env_int("RAG_CHUNK_OVERLAP_LINES", 8),
embeddings_dim=_env_int("RAG_EMBEDDINGS_DIM", 1536),
embeddings_model=os.getenv("RAG_EMBEDDINGS_MODEL", "stub-embeddings"),
llm_model=os.getenv("RAG_LLM_MODEL", "stub-llm"),

View File

@@ -1 +1,11 @@
__all__ = []
from rag_agent.index.postgres import (
ChangedChunkRecord,
fetch_changed_chunks,
get_story_indexed_range,
)
__all__ = [
"ChangedChunkRecord",
"fetch_changed_chunks",
"get_story_indexed_range",
]

View File

@@ -2,13 +2,17 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Iterable
from typing import Iterable, Sequence
import psycopg
from pgvector.psycopg import register_vector
from rag_agent.ingest.chunker import TextChunk
CHANGE_ADDED = "added"
CHANGE_MODIFIED = "modified"
CHANGE_UNCHANGED = "unchanged"
@dataclass(frozen=True)
class ChunkRecord:
@@ -18,6 +22,18 @@ class ChunkRecord:
embedding: list[float]
@dataclass(frozen=True)
class ChangedChunkRecord:
"""Chunk that was added or modified in a story (for test-case generation)."""
path: str
content: str
change_type: str
start_line: int | None
end_line: int | None
previous_content: str | None
def connect(dsn: str) -> psycopg.Connection:
conn = psycopg.connect(dsn)
register_vector(conn)
@@ -32,10 +48,22 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
CREATE TABLE IF NOT EXISTS stories (
id SERIAL PRIMARY KEY,
slug TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc')
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
indexed_base_ref TEXT,
indexed_head_ref TEXT,
indexed_at TIMESTAMPTZ
);
"""
)
for col_def in (
"ADD COLUMN IF NOT EXISTS indexed_base_ref TEXT",
"ADD COLUMN IF NOT EXISTS indexed_head_ref TEXT",
"ADD COLUMN IF NOT EXISTS indexed_at TIMESTAMPTZ",
):
try:
cur.execute(f"ALTER TABLE stories {col_def};")
except psycopg.ProgrammingError:
pass
cur.execute(
"""
CREATE TABLE IF NOT EXISTS documents (
@@ -56,10 +84,41 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
chunk_index INTEGER NOT NULL,
hash TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector({embeddings_dim}) NOT NULL
embedding vector({embeddings_dim}) NOT NULL,
start_line INTEGER,
end_line INTEGER,
change_type TEXT NOT NULL DEFAULT 'added'
CHECK (change_type IN ('added', 'modified', 'unchanged')),
previous_content TEXT
);
"""
)
# Migrations: add columns if table already existed without them (Postgres 11+)
for col_def in (
"ADD COLUMN IF NOT EXISTS start_line INTEGER",
"ADD COLUMN IF NOT EXISTS end_line INTEGER",
"ADD COLUMN IF NOT EXISTS previous_content TEXT",
"ADD COLUMN IF NOT EXISTS change_type TEXT DEFAULT 'added'",
):
try:
cur.execute(f"ALTER TABLE chunks {col_def};")
except psycopg.ProgrammingError:
pass
try:
cur.execute(
"ALTER TABLE chunks ALTER COLUMN change_type SET NOT NULL;"
)
except psycopg.ProgrammingError:
pass
try:
cur.execute(
"""
ALTER TABLE chunks ADD CONSTRAINT chunks_change_type_check
CHECK (change_type IN ('added', 'modified', 'unchanged'));
"""
)
except psycopg.ProgrammingError:
pass # constraint may already exist
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_documents_story_id
@@ -78,6 +137,12 @@ def ensure_schema(conn: psycopg.Connection, embeddings_dim: int) -> None:
ON chunks USING ivfflat (embedding vector_cosine_ops);
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_chunks_change_type
ON chunks(change_type);
"""
)
conn.commit()
@@ -97,6 +162,44 @@ def get_or_create_story(conn: psycopg.Connection, slug: str) -> int:
return story_id
def update_story_indexed_range(
conn: psycopg.Connection,
story_id: int,
base_ref: str,
head_ref: str,
) -> None:
"""Record that this story was indexed as all changes from base_ref to head_ref (all commits in story)."""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE stories
SET indexed_base_ref = %s, indexed_head_ref = %s,
indexed_at = (NOW() AT TIME ZONE 'utc')
WHERE id = %s;
""",
(base_ref.strip(), head_ref.strip(), story_id),
)
conn.commit()
def get_story_indexed_range(
conn: psycopg.Connection, story_id: int
) -> tuple[str | None, str | None, datetime | None]:
"""Return (indexed_base_ref, indexed_head_ref, indexed_at) for the story, or (None, None, None)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT indexed_base_ref, indexed_head_ref, indexed_at
FROM stories WHERE id = %s;
""",
(story_id,),
)
row = cur.fetchone()
if row is None:
return (None, None, None)
return (row[0], row[1], row[2])
def get_story_id(conn: psycopg.Connection, slug: str) -> int | None:
s = slug.strip()
with conn.cursor() as cur:
@@ -127,28 +230,98 @@ def upsert_document(
return document_id
def _change_type_and_previous(
chunk: TextChunk,
base_by_range: dict[tuple[int, int], TextChunk],
) -> tuple[str, str | None]:
"""Determine change_type and previous_content for a chunk given base chunks keyed by (start_line, end_line)."""
if chunk.start_line is None or chunk.end_line is None:
return (CHANGE_ADDED, None)
key = (chunk.start_line, chunk.end_line)
base = base_by_range.get(key)
if base is None:
return (CHANGE_ADDED, None)
if base.hash == chunk.hash:
return (CHANGE_UNCHANGED, None)
return (CHANGE_MODIFIED, base.text)
def replace_chunks(
conn: psycopg.Connection,
document_id: int,
chunks: Iterable[TextChunk],
embeddings: Iterable[list[float]],
base_chunks: Sequence[TextChunk] | None = None,
) -> None:
base_by_range: dict[tuple[int, int], TextChunk] = {}
if base_chunks:
for c in base_chunks:
if c.start_line is not None and c.end_line is not None:
base_by_range[(c.start_line, c.end_line)] = c
with conn.cursor() as cur:
cur.execute(
"DELETE FROM chunks WHERE document_id = %s;",
(document_id,),
)
for chunk, embedding in zip(chunks, embeddings):
change_type, previous_content = _change_type_and_previous(
chunk, base_by_range
)
cur.execute(
"""
INSERT INTO chunks (document_id, chunk_index, hash, content, embedding)
VALUES (%s, %s, %s, %s, %s);
INSERT INTO chunks (
document_id, chunk_index, hash, content, embedding,
start_line, end_line, change_type, previous_content
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
(document_id, chunk.index, chunk.hash, chunk.text, embedding),
(
document_id,
chunk.index,
chunk.hash,
chunk.text,
embedding,
chunk.start_line,
chunk.end_line,
change_type,
previous_content,
),
)
conn.commit()
def fetch_changed_chunks(
conn: psycopg.Connection, story_id: int
) -> list[ChangedChunkRecord]:
"""Return chunks that were added or modified in this story (for test-case generation)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT d.path, c.content, c.change_type, c.start_line, c.end_line,
c.previous_content
FROM chunks c
JOIN documents d ON d.id = c.document_id
WHERE d.story_id = %s
AND c.change_type IN ('added', 'modified')
ORDER BY d.path, c.start_line NULLS FIRST, c.chunk_index;
""",
(story_id,),
)
rows = cur.fetchall()
return [
ChangedChunkRecord(
path=row[0],
content=row[1],
change_type=row[2],
start_line=row[3],
end_line=row[4],
previous_content=row[5],
)
for row in rows
]
def delete_document(
conn: psycopg.Connection, story_id: int, path: str
) -> None:

View File

@@ -10,6 +10,8 @@ class TextChunk:
index: int
text: str
hash: str
start_line: int | None = None
end_line: int | None = None
def _hash_text(text: str) -> str:
@@ -27,7 +29,15 @@ def chunk_text(text: str, chunk_size: int, overlap: int) -> list[TextChunk]:
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_text_value = " ".join(tokens[start:end])
chunks.append(TextChunk(index=index, text=chunk_text_value, hash=_hash_text(chunk_text_value)))
chunks.append(
TextChunk(
index=index,
text=chunk_text_value,
hash=_hash_text(chunk_text_value),
start_line=None,
end_line=None,
)
)
index += 1
if end == len(tokens):
break
@@ -40,3 +50,34 @@ def iter_chunks(
) -> Iterator[list[TextChunk]]:
for text in texts:
yield chunk_text(text, chunk_size, overlap)
def chunk_text_by_lines(
text: str, max_lines: int, overlap_lines: int
) -> list[TextChunk]:
"""Chunk by consecutive lines; each chunk has start_line/end_line (1-based)."""
lines = text.splitlines()
if not lines:
return []
chunks: list[TextChunk] = []
index = 0
start = 0
while start < len(lines):
end = min(start + max_lines, len(lines))
chunk_lines = lines[start:end]
chunk_text_value = "\n".join(chunk_lines)
chunks.append(
TextChunk(
index=index,
text=chunk_text_value,
hash=_hash_text(chunk_text_value),
start_line=start + 1,
end_line=end,
)
)
index += 1
if end == len(lines):
break
start = max(end - overlap_lines, 0)
return chunks

View File

@@ -40,3 +40,53 @@ def filter_existing(paths: Iterable[Path]) -> list[Path]:
def filter_removed(paths: Iterable[Path]) -> list[Path]:
return [path for path in paths if not path.exists()]
def get_merge_base(
repo_path: str, ref1: str, ref2: str = "HEAD"
) -> str | None:
"""Return merge-base commit of ref1 and ref2 (start of story range). None on error."""
args = [
"git",
"-C",
repo_path,
"merge-base",
ref1,
ref2,
]
try:
result = subprocess.run(
args,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip() or None
except subprocess.CalledProcessError:
return None
def read_file_at_ref(
repo_path: str, path: Path, ref: str
) -> str | None:
"""Read file content at a git ref. Returns None if path is missing at ref."""
repo = Path(repo_path)
rel = path.relative_to(repo) if path.is_absolute() else path
rel_str = rel.as_posix()
args = [
"git",
"-C",
repo_path,
"show",
f"{ref}:{rel_str}",
]
try:
result = subprocess.run(
args,
check=True,
capture_output=True,
text=True,
)
return result.stdout
except subprocess.CalledProcessError:
return None

170
src/rag_agent/webhook.py Normal file
View File

@@ -0,0 +1,170 @@
"""Webhook server: on push from remote repo, pull and run index --changed."""
from __future__ import annotations
import hmac
import hashlib
import json
import logging
import os
import subprocess
import threading
from pathlib import Path
from fastapi import FastAPI, Request, Response
from fastapi.responses import PlainTextResponse
logger = logging.getLogger(__name__)
app = FastAPI(title="RAG Agent Webhook", version="0.1.0")
def _branch_from_ref(ref: str) -> str | None:
"""refs/heads/main -> main."""
if not ref or not ref.startswith("refs/heads/"):
return None
return ref.removeprefix("refs/heads/")
def _verify_github_signature(body: bytes, secret: str, signature_header: str | None) -> bool:
if not secret or not signature_header or not signature_header.startswith("sha256="):
return not secret
expected = hmac.new(
secret.encode("utf-8"), body, digestmod=hashlib.sha256
).hexdigest()
received = signature_header.removeprefix("sha256=").strip()
return hmac.compare_digest(received, expected)
def _run_index(repo_path: str, story: str, base_ref: str, head_ref: str) -> bool:
env = os.environ.copy()
env["RAG_REPO_PATH"] = repo_path
try:
proc = subprocess.run(
["rag-agent", "index", "--story", story, "--changed", "--base-ref", base_ref, "--head-ref", head_ref],
env=env,
capture_output=True,
text=True,
timeout=600,
)
if proc.returncode != 0:
logger.error("index failed: %s %s", proc.stdout, proc.stderr)
return False
logger.info("index completed for story=%s %s..%s", story, base_ref, head_ref)
return True
except subprocess.TimeoutExpired:
logger.error("index timeout for story=%s", story)
return False
except Exception as e:
logger.exception("index error: %s", e)
return False
def _pull_and_index(repo_path: str, branch: str) -> None:
repo = Path(repo_path)
if not repo.is_dir() or not (repo / ".git").exists():
logger.warning("not a git repo or missing: %s", repo_path)
return
try:
subprocess.run(
["git", "-C", repo_path, "fetch", "origin", branch],
check=True,
capture_output=True,
timeout=60,
)
except subprocess.CalledProcessError as e:
logger.warning("git fetch failed: %s", e.stderr)
return
except Exception as e:
logger.exception("git fetch error: %s", e)
return
try:
subprocess.run(
["git", "-C", repo_path, "checkout", branch],
check=True,
capture_output=True,
timeout=10,
)
except subprocess.CalledProcessError as e:
logger.warning("git checkout %s failed: %s", branch, e.stderr)
return
try:
old_head = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=10,
)
old_head = (old_head.stdout or "").strip() if old_head.returncode == 0 else None
except Exception as e:
logger.exception("rev-parse HEAD: %s", e)
return
try:
merge_proc = subprocess.run(
["git", "-C", repo_path, "merge", "--ff-only", f"origin/{branch}"],
capture_output=True,
text=True,
timeout=60,
)
except subprocess.TimeoutExpired:
logger.error("git merge timeout")
return
if merge_proc.returncode != 0:
logger.warning("git merge --ff-only failed (non-fast-forward?). Skipping index.")
return
new_head = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=10,
)
new_head = (new_head.stdout or "").strip() if new_head.returncode == 0 else None
if not old_head or not new_head or old_head == new_head:
logger.info("no new commits for branch=%s", branch)
return
_run_index(repo_path, story=branch, base_ref=old_head, head_ref=new_head)
@app.post("/webhook")
async def webhook(request: Request) -> Response:
"""Handle push webhook from GitHub/GitLab: pull repo and run index --changed."""
body = await request.body()
secret = os.getenv("WEBHOOK_SECRET", "").strip()
sig = request.headers.get("X-Hub-Signature-256")
if secret and not _verify_github_signature(body, secret, sig):
return PlainTextResponse("Invalid signature", status_code=401)
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = None
if not payload or not isinstance(payload, dict):
return PlainTextResponse("Invalid JSON", status_code=400)
ref = payload.get("ref")
branch = _branch_from_ref(ref) if ref else None
if not branch:
return PlainTextResponse("Missing or unsupported ref", status_code=400)
repo_path = os.getenv("RAG_REPO_PATH", "").strip()
if not repo_path:
return PlainTextResponse("RAG_REPO_PATH not set", status_code=500)
threading.Thread(
target=_pull_and_index,
args=(repo_path, branch),
daemon=True,
).start()
return PlainTextResponse("Accepted", status_code=202)
@app.get("/health")
async def health() -> str:
return "ok"