Настройка парсера
This commit is contained in:
@@ -9,6 +9,7 @@ from .order_position import OrderPosition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfigurableExcelParser(ExcelParser):
|
||||
"""
|
||||
Универсальный парсер, настраиваемый через конфигурацию.
|
||||
@@ -18,14 +19,11 @@ class ConfigurableExcelParser(ExcelParser):
|
||||
def parse(self, file_bytes: str) -> List[OrderPosition]:
|
||||
try:
|
||||
# Читаем Excel
|
||||
df = self._read_excel(file_bytes)
|
||||
|
||||
# Удаляем пустые строки
|
||||
df = df.dropna(how='all')
|
||||
df = self._make_dataframe(file_bytes)
|
||||
|
||||
# Получаем маппинг колонок из конфигурации
|
||||
mapping = self.config['mapping']
|
||||
|
||||
|
||||
# Парсим строки
|
||||
positions = []
|
||||
for idx, row in df.iterrows():
|
||||
@@ -34,36 +32,50 @@ class ConfigurableExcelParser(ExcelParser):
|
||||
if position:
|
||||
positions.append(position)
|
||||
except Exception as e:
|
||||
logger.warning(f"Ошибка парсинга строки {idx}: {e}")
|
||||
logger.error(f"Ошибка парсинга строки {idx}: {e}, {row}")
|
||||
continue
|
||||
|
||||
logger.info(f"Успешно обработано {len(positions)} позиций из {len(df)} строк")
|
||||
return positions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при обработке файла {filepath}: {e}")
|
||||
raise
|
||||
logger.error(f"Ошибка при обработке файла: {e}")
|
||||
raise Exception from e
|
||||
|
||||
def _parse_row(self, row: pd.Series, mapping: Dict[str, str]) -> Optional[OrderPosition]:
|
||||
"""Парсит одну строку Excel в OrderPosition"""
|
||||
|
||||
|
||||
# Проверяем обязательные поля
|
||||
required_fields = ['article', 'manufacturer', 'name', 'price', 'quantity', 'total']
|
||||
required_fields = ['article', 'price', 'quantity']
|
||||
|
||||
for field in required_fields:
|
||||
if pd.isna(row.get(mapping[field])):
|
||||
logger.warning(f"Позиция не создана - не заполнено поле {mapping[field]}")
|
||||
return None
|
||||
|
||||
price = Decimal(str(row[mapping['price']]).replace(",", ".").strip())
|
||||
quantity = int(row[mapping['quantity']])
|
||||
|
||||
if "total" in mapping.keys():
|
||||
total = Decimal(str(row[mapping['total']]).replace(",", ".").strip())
|
||||
else:
|
||||
total = price * quantity
|
||||
|
||||
if mapping.get('name',"") in mapping.keys():
|
||||
name = str(row[mapping.get('name', "")]).strip()
|
||||
else:
|
||||
name = ""
|
||||
|
||||
# Создаем объект позиции
|
||||
position = OrderPosition(
|
||||
article=str(row[mapping['article']]).strip(),
|
||||
manufacturer=str(row[mapping['manufacturer']]).strip(),
|
||||
name=str(row[mapping['name']]).strip(),
|
||||
price=Decimal(str(row[mapping['price']])),
|
||||
quantity=int(row[mapping['quantity']]),
|
||||
total=Decimal(str(row[mapping['total']])),
|
||||
manufacturer=str(row[mapping.get('manufacturer',"")]).strip(),
|
||||
name=name,
|
||||
price=price,
|
||||
quantity=quantity,
|
||||
total=total,
|
||||
additional_attrs=self._extract_additional_attrs(row, mapping)
|
||||
)
|
||||
|
||||
return position
|
||||
|
||||
def _extract_additional_attrs(self, row: pd.Series, mapping: Dict[str, str]) -> Dict[str, Any]:
|
||||
@@ -76,3 +88,26 @@ class ConfigurableExcelParser(ExcelParser):
|
||||
additional[col] = row[col]
|
||||
|
||||
return additional
|
||||
|
||||
|
||||
def _make_dataframe(self, bio) -> pd.DataFrame:
|
||||
# Получаем все данные из файла
|
||||
sheet_name = self.config.get("sheet_name", 0)
|
||||
df_full = pd.read_excel(bio, sheet_name=sheet_name, header=None)
|
||||
|
||||
# Находим индекс строки с заголовком
|
||||
key_field = self.config.get("key_field")
|
||||
header_row_idx = df_full[
|
||||
df_full.apply(lambda row: row.astype(str).str.contains(key_field, case=False, na=False).any(),
|
||||
axis=1)].index[0]
|
||||
|
||||
# Считываем таблицу с правильным заголовком
|
||||
df = pd.read_excel(bio, header=header_row_idx, sheet_name=sheet_name, engine='calamine') #openpyxl calamine
|
||||
|
||||
# Находим индекс первой строки с пустым 'Артикул'
|
||||
first_empty_index = df[df[key_field].isna()].index.min()
|
||||
|
||||
# Обрезаем DataFrame до первой пустой строки (не включая её)
|
||||
df_trimmed = df.loc[:first_empty_index - 1]
|
||||
|
||||
return df_trimmed
|
||||
|
||||
105
src/mail_order_bot/excel_processor/custom_parser_autoeuro.py
Normal file
105
src/mail_order_bot/excel_processor/custom_parser_autoeuro.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import logging
|
||||
import pandas as pd
|
||||
from typing import Dict, Any, Optional, List
|
||||
from decimal import Decimal
|
||||
import xlrd
|
||||
from io import BytesIO
|
||||
|
||||
from .excel_parser import ExcelParser
|
||||
from .order_position import OrderPosition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CustomExcelParserAutoeuro(ExcelParser):
|
||||
"""
|
||||
Универсальный парсер, настраиваемый через конфигурацию.
|
||||
Подходит для большинства стандартных случаев.
|
||||
"""
|
||||
|
||||
def parse(self, file_bytes: BytesIO) -> List[OrderPosition]:
|
||||
try:
|
||||
# Читаем Excel
|
||||
df = self._make_dataframe(file_bytes)
|
||||
|
||||
# Получаем маппинг колонок из конфигурации
|
||||
mapping = self.config['mapping']
|
||||
|
||||
# Парсим строки
|
||||
positions = []
|
||||
for idx, row in df.iterrows():
|
||||
try:
|
||||
position = self._parse_row(row, mapping)
|
||||
if position:
|
||||
positions.append(position)
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка парсинга строки {idx}: {e}, {row}")
|
||||
continue
|
||||
|
||||
logger.info(f"Успешно обработано {len(positions)} позиций из {len(df)} строк")
|
||||
return positions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при обработке файла: {e}")
|
||||
raise Exception from e
|
||||
|
||||
def _parse_row(self, row: pd.Series, mapping: Dict[str, str]) -> Optional[OrderPosition]:
|
||||
"""Парсит одну строку Excel в OrderPosition"""
|
||||
|
||||
# Проверяем обязательные поля
|
||||
required_fields = ['article', 'price', 'quantity']
|
||||
|
||||
for field in required_fields:
|
||||
if pd.isna(row.get(mapping[field])):
|
||||
logger.warning(f"Позиция не создана - не заполнено поле {mapping[field]}")
|
||||
return None
|
||||
|
||||
price = Decimal(str(row[mapping['price']]).replace(",", ".").strip())
|
||||
quantity = int(row[mapping['quantity']])
|
||||
|
||||
if "total" in mapping.keys():
|
||||
total = Decimal(str(row[mapping['total']]).replace(",", ".").strip())
|
||||
else:
|
||||
total = price * quantity
|
||||
|
||||
# Создаем объект позиции
|
||||
position = OrderPosition(
|
||||
article=str(row[mapping['article']]).strip(),
|
||||
manufacturer=str(row[mapping.get('manufacturer', "")]).strip(),
|
||||
name="", #str(row[mapping.get('name', "name")]).strip(),
|
||||
price=price,
|
||||
quantity=quantity,
|
||||
total=total,
|
||||
additional_attrs=self._extract_additional_attrs(row, mapping)
|
||||
)
|
||||
return position
|
||||
|
||||
def _extract_additional_attrs(self, row: pd.Series, mapping: Dict[str, str]) -> Dict[str, Any]:
|
||||
"""Извлекает дополнительные атрибуты, не входящие в основную модель"""
|
||||
additional = {}
|
||||
mapped_columns = set(mapping.values())
|
||||
|
||||
for col in row.index:
|
||||
if col not in mapped_columns and not pd.isna(row[col]):
|
||||
additional[col] = row[col]
|
||||
|
||||
return additional
|
||||
|
||||
def _make_dataframe(self, bio) -> pd.DataFrame:
|
||||
|
||||
file_bytes = bio.read()
|
||||
book = xlrd.open_workbook(file_contents=file_bytes, encoding_override='cp1251')
|
||||
sheet = book.sheet_by_index(self.config.get("sheet_index", 0))
|
||||
data = [sheet.row_values(row) for row in range(sheet.nrows)]
|
||||
|
||||
df_full = pd.DataFrame(data)
|
||||
|
||||
key_field = self.config.get("key_field")
|
||||
header_row_idx = df_full[
|
||||
df_full.apply(lambda row: row.astype(str).str.contains(key_field, case=False, na=False).any(),
|
||||
axis=1)].index[0]
|
||||
|
||||
df = df_full[header_row_idx:]
|
||||
df.columns = df.iloc[0] # первая строка становится заголовком
|
||||
df = df.reset_index(drop=True).drop(0).reset_index(drop=True) # удаляем первую строку и сбрасываем индекс
|
||||
return df
|
||||
@@ -20,30 +20,9 @@ class ExcelParser(ABC):
|
||||
self.config = config
|
||||
|
||||
@abstractmethod
|
||||
def parse(self, filepath: str) -> List[OrderPosition]:
|
||||
def parse(self, file: BytesIO) -> List[OrderPosition]:
|
||||
"""
|
||||
Парсит Excel файл и возвращает список позиций.
|
||||
Должен быть реализован в каждом конкретном парсере.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _read_excel_from_file(self, filepath: str) -> pd.DataFrame:
|
||||
"""Общий метод для чтения Excel файлов"""
|
||||
return pd.read_excel(
|
||||
filepath,
|
||||
sheet_name=self.config.get('sheet_name', 0),
|
||||
header=self.config.get('header_row', 0),
|
||||
#engine='openpyxl'
|
||||
engine='calamine'
|
||||
)
|
||||
|
||||
def _read_excel(self, file_content: bytes) -> pd.DataFrame:
|
||||
"""Общий метод для чтения Excel файлов из байтового содержимого файла"""
|
||||
bio = BytesIO(file_content)
|
||||
return pd.read_excel(
|
||||
bio,
|
||||
sheet_name=self.config.get('sheet_name', 0),
|
||||
header=self.config.get('header_row', 0),
|
||||
engine='calamine'
|
||||
)
|
||||
|
||||
pass
|
||||
@@ -6,6 +6,7 @@ from typing import Dict, Any, List
|
||||
|
||||
from .excel_parser import ExcelParser
|
||||
from .configurable_parser import ConfigurableExcelParser
|
||||
from .custom_parser_autoeuro import CustomExcelParserAutoeuro
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -18,48 +19,36 @@ class ParserFactory:
|
||||
|
||||
# Реестр кастомных парсеров
|
||||
CUSTOM_PARSERS = {
|
||||
#'supplier_a': SupplierAParser,
|
||||
'autoeuro.ru': CustomExcelParserAutoeuro,
|
||||
# Добавляйте сюда специализированные парсеры
|
||||
}
|
||||
|
||||
def __init__(self, config_path: str):
|
||||
self.config_path = Path(config_path)
|
||||
self.suppliers_config = self._load_config()
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Загружает конфигурацию из YAML или JSON"""
|
||||
if self.config_path.suffix in ['.yaml', '.yml']:
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
return yaml.safe_load(f)
|
||||
elif self.config_path.suffix == '.json':
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
raise ValueError(f"Неподдерживаемый формат конфига: {self.config_path.suffix}")
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.config = config
|
||||
|
||||
def get_parser(self, supplier_name: str) -> ExcelParser:
|
||||
"""
|
||||
Возвращает парсер для указанного контрагента.
|
||||
Использует кастомный парсер если есть, иначе конфигурируемый.
|
||||
"""
|
||||
if supplier_name not in self.suppliers_config['suppliers']:
|
||||
if supplier_name not in self.config['suppliers']:
|
||||
raise ValueError(
|
||||
f"Контрагент '{supplier_name}' не найден в конфигурации. "
|
||||
f"Доступные: {list(self.suppliers_config['suppliers'].keys())}"
|
||||
f"Доступные: {list(self.config['suppliers'].keys())}"
|
||||
)
|
||||
|
||||
config = self.suppliers_config['suppliers'][supplier_name]
|
||||
config = self.config['suppliers'][supplier_name]
|
||||
|
||||
# Проверяем, есть ли кастомный парсер
|
||||
if supplier_name in self.CUSTOM_PARSERS:
|
||||
parser_class = self.CUSTOM_PARSERS[supplier_name]
|
||||
logger.info(f"Используется кастомный парсер для {supplier_name}")
|
||||
logger.debug(f"Используется кастомный парсер для {supplier_name}")
|
||||
else:
|
||||
parser_class = ConfigurableExcelParser
|
||||
logger.info(f"Используется конфигурируемый парсер для {supplier_name}")
|
||||
logger.debug(f"Используется конфигурируемый парсер для {supplier_name}")
|
||||
|
||||
return parser_class(config)
|
||||
|
||||
def list_suppliers(self) -> List[str]:
|
||||
"""Возвращает список всех доступных контрагентов"""
|
||||
return list(self.suppliers_config['suppliers'].keys())
|
||||
return list(self.config['suppliers'].keys())
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import logging
|
||||
from typing import List
|
||||
from pathlib import Path
|
||||
from decimal import Decimal
|
||||
from io import BytesIO
|
||||
from typing import Dict, Any, List
|
||||
import yaml
|
||||
import json
|
||||
|
||||
|
||||
from .parser_factory import ParserFactory
|
||||
from .order_position import OrderPosition
|
||||
@@ -15,55 +20,52 @@ class ExcelProcessor:
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: str = 'config/suppliers.yaml', ):
|
||||
self.factory = ParserFactory(config_path)
|
||||
self._setup_logging()
|
||||
|
||||
def _setup_logging(self):
|
||||
"""Настройка логирования"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
def process_file(
|
||||
self,
|
||||
#filepath: str,
|
||||
file_bytes: str,
|
||||
file_name: str,
|
||||
supplier_name: str,
|
||||
validate: bool = True
|
||||
) -> List[OrderPosition]:
|
||||
self.config_path = Path(config_path)
|
||||
self.config = self._load_config()
|
||||
self.factory = ParserFactory(self.config)
|
||||
|
||||
def process(self, file_bytes: BytesIO, file_name: str, supplier_name: str, validate: bool = False) -> List[OrderPosition]:
|
||||
"""
|
||||
Обрабатывает Excel файл от контрагента.
|
||||
|
||||
|
||||
Args:
|
||||
filepath: Путь к Excel файлу
|
||||
file_bytes: Байты файла
|
||||
file_name: Имя файла
|
||||
supplier_name: Название контрагента (из конфигурации)
|
||||
validate: Выполнять ли дополнительную валидацию
|
||||
|
||||
|
||||
Returns:
|
||||
Список объектов OrderPosition
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: Если контрагент не найден
|
||||
FileNotFoundError: Если файл не найден
|
||||
"""
|
||||
logger.info(f"Начало обработки файла: {file_name} для {supplier_name}")
|
||||
|
||||
# Проверка существования файла
|
||||
#if not Path(filepath).exists():
|
||||
# raise FileNotFoundError(f"Файл не найден: {filepath}")
|
||||
|
||||
# Получаем парсер и обрабатываем
|
||||
logger.info(f"Обработка файла: {file_name} для {supplier_name}")
|
||||
|
||||
parser = self.factory.get_parser(supplier_name)
|
||||
positions = parser.parse(file_bytes)
|
||||
|
||||
|
||||
# Дополнительная валидация если нужна
|
||||
if validate:
|
||||
positions = self._validate_positions(positions)
|
||||
|
||||
logger.info(f"Обработка завершена: получено {len(positions)} позиций")
|
||||
|
||||
logger.debug(f"Обработка завершена: получено {len(positions)} позиций")
|
||||
return positions
|
||||
|
||||
def process_file(self, file_path: str, supplier_name: str, validate: bool = False) -> List[OrderPosition]:
|
||||
# Проверка существования файла
|
||||
logger.debug(f"Чтение файла: {file_path}")
|
||||
if not Path(file_path).exists():
|
||||
raise FileNotFoundError(f"Файл не найден: {file_path}")
|
||||
|
||||
with open(file_path, 'rb') as file: # бинарный режим
|
||||
raw_data = file.read()
|
||||
bio = BytesIO(raw_data)
|
||||
|
||||
positions = self.process(bio, file_path, supplier_name, validate=validate)
|
||||
|
||||
return positions
|
||||
|
||||
|
||||
def _validate_positions(self, positions: List[OrderPosition]) -> List[OrderPosition]:
|
||||
"""Дополнительная валидация позиций"""
|
||||
@@ -95,3 +97,14 @@ class ExcelProcessor:
|
||||
def get_available_suppliers(self) -> List[str]:
|
||||
"""Возвращает список доступных контрагентов"""
|
||||
return self.factory.list_suppliers()
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Загружает конфигурацию из YAML или JSON"""
|
||||
if self.config_path.suffix in ['.yaml', '.yml']:
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
return yaml.safe_load(f)
|
||||
elif self.config_path.suffix == '.json':
|
||||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
raise ValueError(f"Неподдерживаемый формат конфига: {self.config_path.suffix}")
|
||||
|
||||
Reference in New Issue
Block a user