Рефаткоринг, добавляю пайплайн

This commit is contained in:
2025-11-11 23:08:40 +03:00
parent 6abceda30e
commit ee439c6bf6
9 changed files with 194 additions and 28 deletions

View File

@@ -0,0 +1,28 @@
clients:
"todx.ru":
- handler: "OrderParser",
config:
sheet_name: "Лист1"
key_value: "Артикул"
mapping:
name: "Наименование"
manufacturer: "Производитель"
price: "Цена\nдетали"
quantity: "Кол-\nво"
total: "Сумма"
- handler: "OrderCreator"
- handler: "ExcelWriter"
config: "output.xlsx"
- handler: "EmailSender"
config:
to: "todx@yandex.ru"
- Notifier:
- channel: "email"
to : "status@zapchastiya.ru"
- channel: "telegram"
to: "123454323"

View File

@@ -0,0 +1,21 @@
import logging
import requests
from ..abstract_task import AbstractTask
logger = logging.getLogger(__name__)
class InstantOrderTest(AbstractTask):
URL = "https://api.telegram.org/bot{0}/sendMessage?chat_id={1}&text={2}"
def do(self) -> None:
positions = self.context["positions"]
message = f"Запрос на создание заказа от {self.context['client']}:\n"
message += "\n".join(f"{pos.article}: {pos.name} ({pos.quantity} x {pos.price} = {pos.total})" for pos in positions)
api_key = self.config["api_key"]
chat_id = self.config["chat_id"]
url = self.URL.format(api_key, chat_id, message)
resp = requests.get(url).json()
logger.info(resp)

View File

@@ -1,25 +1,22 @@
import logging
import pandas as pd
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from io import BytesIO
logger = logging.getLogger(__name__)
from typing import Dict, Any
class AbstractHandler(ABC):
class AbstractTask(ABC):
"""
Абстрактный базовый класс для всех хэндлеров.
"""
def __init__(self, config: Dict[str, Any], context: Dict[str, Any],*args, **kwargs) -> None:
self.config = config
self.context = context
@abstractmethod
def do(self, *args, **kwargs) -> Dict[str, Any]:
def do(self) -> None:
"""
Парсит Excel файл и возвращает список позиций.
Должен быть реализован в каждом конкретном парсере.
Выполняет работу над заданием
Входные и выходные данные - в self.context
Конфиг задается при инициализации
"""
pass
raise NotImplementedError

View File

@@ -16,7 +16,7 @@ class BasicExcelParser(AbstractTask):
Подходит для большинства стандартных случаев.
"""
def do(self) -> List[OrderPosition]:
def do(self) -> None:
# todo сделать проверку на наличие файла и его тип
file_bytes = BytesIO(self.context.get("attachment")) # self.context.get("attachment") #

View File

@@ -0,0 +1,18 @@
import logging
import pandas as pd
from typing import Dict, Any, Optional, List
from decimal import Decimal
from ..abstract_task import AbstractTask
logger = logging.getLogger(__name__)
class TestNotifier(AbstractTask):
def do(self) -> None:
positions = self.context["positions"]
print(f"\nПолучено {len(positions)} позиций от {self.context["client"]}:")
for pos in positions: # Первые 5
print(f" - {pos.article}: {pos.name} "
f"({pos.quantity} x {pos.price} = {pos.total})")

View File

@@ -0,0 +1,15 @@
from enum import Enum
class OrderStatus(Enum):
NEW = 1
IN_PROGRESS = 2
COMPLETED = 3
FAILED = 4
OPERATOR_HANDLING = 5
INVALID = 6
class Order:
def __init__(self, context: dict):
attachment = context["attachment"]
self.context = context

View File

@@ -1,35 +1,36 @@
from pathlib import Path
import os
import yaml
import json
import logging
from typing import Dict, Any
from pathlib import Path
from ..task_handler.excel_parsers.basic_excel_parcer import BasicExcelParser
from ..task_handler.notifiers.test_notifier import TestNotifier
from ..task_handler.abcp_client.OrderCreator import InstantOrderTest
from ..excel_processor.configurable_parser import ConfigurableExcelParser
logger = logging.getLogger(__name__)
class TaskProcessor:
def __init__(self, config_path: Path):
self.config_path = config_path
self.context = {}
self.context = dict()
def process(self, client, file_object):
def process(self, client, attachment):
config = self._load_config(client)
self.context = dict()
self.context["client"] = client
self.context["attachment"] = attachment.content
self.context["status"] = client
for stage in config["pipeline"]:
handler_name = stage["handler"]
config = stage["config"]
handler = globals()[handler_name](config)
self.context["positions"] = handler.parse(file_object)
return self.context["positions"]
logger.info(f"Processing handler: {handler_name}")
task = globals()[handler_name](stage.get("config", None), self.context)
task.do()
return self.context
pass