Скелет проекта
This commit is contained in:
@@ -0,0 +1 @@
|
||||
__all__ = []
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Iterator
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TextChunk:
|
||||
index: int
|
||||
text: str
|
||||
hash: str
|
||||
|
||||
|
||||
def _hash_text(text: str) -> str:
|
||||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def chunk_text(text: str, chunk_size: int, overlap: int) -> list[TextChunk]:
|
||||
tokens = text.split()
|
||||
if not tokens:
|
||||
return []
|
||||
|
||||
chunks: list[TextChunk] = []
|
||||
start = 0
|
||||
index = 0
|
||||
while start < len(tokens):
|
||||
end = min(start + chunk_size, len(tokens))
|
||||
chunk_text_value = " ".join(tokens[start:end])
|
||||
chunks.append(TextChunk(index=index, text=chunk_text_value, hash=_hash_text(chunk_text_value)))
|
||||
index += 1
|
||||
if end == len(tokens):
|
||||
break
|
||||
start = max(end - overlap, 0)
|
||||
return chunks
|
||||
|
||||
|
||||
def iter_chunks(
|
||||
texts: Iterable[str], chunk_size: int, overlap: int
|
||||
) -> Iterator[list[TextChunk]]:
|
||||
for text in texts:
|
||||
yield chunk_text(text, chunk_size, overlap)
|
||||
@@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Iterator
|
||||
|
||||
|
||||
def is_allowed(path: Path, allowed_extensions: Iterable[str]) -> bool:
|
||||
return path.suffix.lower() in {ext.lower() for ext in allowed_extensions}
|
||||
|
||||
|
||||
def read_text_file(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8", errors="ignore")
|
||||
|
||||
|
||||
def iter_text_files(
|
||||
paths: Iterable[Path], allowed_extensions: Iterable[str]
|
||||
) -> Iterator[tuple[Path, str]]:
|
||||
for path in paths:
|
||||
if not path.is_file():
|
||||
continue
|
||||
if not is_allowed(path, allowed_extensions):
|
||||
continue
|
||||
yield path, read_text_file(path)
|
||||
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def get_changed_files(
|
||||
repo_path: str, base_ref: str, head_ref: str = "HEAD"
|
||||
) -> list[Path]:
|
||||
args = [
|
||||
"git",
|
||||
"-C",
|
||||
repo_path,
|
||||
"diff",
|
||||
"--name-only",
|
||||
base_ref,
|
||||
head_ref,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
args, check=True, capture_output=True, text=True
|
||||
)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
raise RuntimeError(
|
||||
f"git diff failed: {exc.stderr.strip() or exc}"
|
||||
) from exc
|
||||
|
||||
files = []
|
||||
for line in result.stdout.splitlines():
|
||||
value = line.strip()
|
||||
if value:
|
||||
files.append(Path(repo_path) / value)
|
||||
return files
|
||||
|
||||
|
||||
def filter_existing(paths: Iterable[Path]) -> list[Path]:
|
||||
return [path for path in paths if path.exists()]
|
||||
|
||||
|
||||
def filter_removed(paths: Iterable[Path]) -> list[Path]:
|
||||
return [path for path in paths if not path.exists()]
|
||||
Reference in New Issue
Block a user