2 Commits

Author SHA1 Message Date
1ef4271137 Mail agent added 2025-10-28 23:10:48 +03:00
6b87bd226e .gitignore updated 2025-10-28 23:08:46 +03:00
11 changed files with 542 additions and 3 deletions

6
.gitignore vendored
View File

@@ -1 +1,5 @@
.venv
venv
.vscode
__pycache__
.env
.cursorignore

View File

@@ -0,0 +1,95 @@
# Работа с переменными окружения в Python
## Основные способы
### 1. Через `os.environ` (стандартная библиотека)
```python
import os
# Чтение с безопасным значением по умолчанию
email = os.environ.get('EMAIL_ADDRESS', 'default@example.com')
# Чтение обязательной переменной
password = os.environ.get('EMAIL_PASSWORD')
if not password:
raise ValueError("EMAIL_PASSWORD not set")
# Прямой доступ (вызовет KeyError, если переменной нет)
email = os.environ['EMAIL_ADDRESS']
```
### 2. С использованием python-dotenv (рекомендуется для разработки)
```python
import os
from dotenv import load_dotenv
# Загрузить переменные из .env файла
load_dotenv()
# Теперь работаем с переменными
email = os.environ.get('EMAIL_ADDRESS')
```
## Установка python-dotenv
```bash
pip install python-dotenv
```
Или добавьте в зависимости проекта (уже добавлено в `pyptoject.toml`):
```toml
dependencies = [
"python-dotenv>=1.0.0"
]
```
## Настройка .env файла
1. Скопируйте `example.env` в `.env`:
```bash
cp example.env .env
```
2. Отредактируйте `.env` и добавьте свои данные:
```env
EMAIL_ADDRESS=your_email@gmail.com
EMAIL_PASSWORD=your_app_password
IMAP_HOST=imap.gmail.com
IMAP_PORT=993
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
```
3. Убедитесь, что `.env` добавлен в `.gitignore` (уже добавлено)
## Пример использования в MailOrderBot
```python
import os
from dotenv import load_dotenv
from mail_order_bot.email_client import EmailClient
# Загрузить переменные из .env файла
load_dotenv()
# Чтение переменных
EMAIL = os.environ.get('EMAIL_ADDRESS')
PASSWORD = os.environ.get('EMAIL_PASSWORD')
# Создание клиента
client = EmailClient(
imap_host='imap.gmail.com',
smtp_host='smtp.gmail.com',
email=EMAIL,
password=PASSWORD
)
```
## Безопасность
- НЕ коммитьте `.env` файл в git
- Используйте `.env.example` как шаблон
- Храните реальные пароли только в `.env` (уже добавлено в `.gitignore`)

12
example.env Normal file
View File

@@ -0,0 +1,12 @@
# Email configuration
EMAIL_ADDRESS=your_email@gmail.com
EMAIL_PASSWORD=your_app_password
# IMAP settings
IMAP_HOST=imap.gmail.com
IMAP_PORT=993
# SMTP settings
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587

View File

@@ -5,7 +5,9 @@ build-backend = "setuptools.build_meta"
[project]
name = "MailOrderBot"
requires-python = ">=3.12"
dependencies = []
dependencies = [
"python-dotenv>=1.0.0"
]
dynamic = ["version"]
[tool.setuptools.packages.find]

0
src/__init__.py Normal file
View File

View File

@@ -1 +0,0 @@
from config_manager import config_manager

View File

@@ -0,0 +1,19 @@
from .email_client import EmailClient
from .email_objects import EmailMessage, EmailAttachment
__all__ = ['EmailClient', 'EmailMessage', 'EmailAttachment']
def test_email_client():
email_client = EmailClient(
imap_host='imap.yandex.ru',
smtp_host='smtp.yandex.ru',
email='zosimovaa@yandex.ru',
password='test'
)
assert email_client is not None
email_client.close()
pytest.main()
if __name__ == "__main__":
test_email_client()

View File

@@ -0,0 +1,353 @@
import imaplib
import smtplib
import re
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import email
from email.header import decode_header
from datetime import datetime
from typing import List, Optional
from dataclasses import dataclass
from .email_objects import EmailMessage, EmailAttachment
class EmailClient:
"""
Класс для работы с электронной почтой по протоколам IMAP и SMTP.
Пример использования:
client = EmailClient(
imap_host='imap.gmail.com',
smtp_host='smtp.gmail.com',
email='your_email@gmail.com',
password='your_password'
)
# Получить новые письма
new_emails = client.get_emails()
# Отправить письмо
msg = EmailMessage(
from_addr='sender@example.com',
subj='Test',
dt=datetime.now(),
body='Hello!',
attachments=[]
)
client.send_email(msg, to_addr='recipient@example.com')
"""
def __init__(
self,
imap_host: str,
smtp_host: str,
email: str,
password: str,
imap_port: int = 993,
smtp_port: int = 587
):
"""
Инициализация клиента электронной почты.
Args:
imap_host: IMAP сервер (например, 'imap.gmail.com')
smtp_host: SMTP сервер (например, 'smtp.gmail.com')
email: Email адрес
password: Пароль или app password
imap_port: Порт IMAP (по умолчанию 993 для SSL)
smtp_port: Порт SMTP (по умолчанию 587 для TLS)
"""
self.imap_host = imap_host
self.smtp_host = smtp_host
self.email = email
self.password = password
self.imap_port = imap_port
self.smtp_port = smtp_port
self.imap_conn = None
def _connect_imap(self):
"""Установить IMAP соединение"""
if self.imap_conn is None:
self.imap_conn = imaplib.IMAP4_SSL(self.imap_host, self.imap_port)
self.imap_conn.login(self.email, self.password)
def _decode_header(self, header_value: str) -> str:
"""
Декодировать заголовок письма.
Args:
header_value: Значение заголовка
Returns:
Декодированная строка
"""
if header_value is None:
return ""
decoded_parts = []
for part, encoding in decode_header(header_value):
if isinstance(part, bytes):
if encoding:
try:
decoded_parts.append(part.decode(encoding))
except:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(part.decode('utf-8', errors='ignore'))
else:
decoded_parts.append(str(part))
return ''.join(decoded_parts)
def _extract_first_sender(self, body: str):
"""
Извлекает адреса отправителей из пересылаемого сообщения по паттерну:
-------- Пересылаемое сообщение --------
07.10.2025, 16:01, Имя (email@example.com):
Кому: ...
"""
# Ищем первую секцию пересылаемого сообщения (по структуре письма)
match = re.search(
r"-{8,}\\s*Пересылаемое сообщение\\s*-{8,}.*?(\\d{2}\\.\\d{2}\\.\\d{4},\\s*\\d{2}:\\d{2},.*?)\\(([^\\)]+)\\):",
body, re.DOTALL)
emails = []
if match:
emails.append(match.group(2)) # email из первой строки пересыла
# Ищем все email в первой пересылаемой секции (например, в "Кому:")
forwarded_section = re.search(
r"^-{8,}.*?Пересылаемое сообщение.*?:$(.*?)(?:^[-=]{5,}|\\Z)",
body, re.MULTILINE | re.DOTALL)
if forwarded_section:
addresses = re.findall(r"\\b([\\w\\.-]+@[\\w\\.-]+)\\b", forwarded_section.group(1))
for addr in addresses:
if addr not in emails:
emails.append(addr)
return emails
def _extract_body(self, msg: email.message.Message) -> str:
"""
Извлечь текст письма.
Args:
msg: Объект письма
Returns:
Текст письма
"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Ищем текстовые части без вложений
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
charset = part.get_content_charset() or 'utf-8'
body += part.get_payload(decode=True).decode(charset, errors='ignore')
except:
pass
else:
try:
charset = msg.get_content_charset() or 'utf-8'
body = msg.get_payload(decode=True).decode(charset, errors='ignore')
except:
pass
return body
def _extract_attachments(self, msg: email.message.Message) -> List[EmailAttachment]:
"""
Извлечь вложения из письма.
Args:
msg: Объект письма
Returns:
Список вложений
"""
attachments = []
for part in msg.walk():
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
# Декодируем имя файла
filename = self._decode_header(filename)
# Получаем содержимое
content = part.get_payload(decode=True)
if content:
attachments.append(
EmailAttachment(filename=filename, content=content)
)
return attachments
def get_emails(
self,
folder: str = "INBOX",
only_unseen: bool = True,
mark_as_read: bool = True
) -> List[EmailMessage]:
"""
Получить список новых электронных писем.
Args:
folder: Папка для получения писем (по умолчанию "INBOX")
only_unseen: Получать только непрочитанные письма (по умолчанию True)
Returns:
Список объектов EmailMessage
"""
self._connect_imap()
# Выбираем папку
self.imap_conn.select(folder, readonly=False)
# Ищем письма
search_criteria = "(UNSEEN)" if only_unseen else "ALL"
status, messages = self.imap_conn.search(None, search_criteria)
if status != "OK":
return []
email_ids = messages[0].split()
emails = []
for email_id in email_ids:
try:
# Получаем письмо
status, msg_data = self.imap_conn.fetch(email_id, "(RFC822)")
if status != "OK":
continue
# Парсим письмо
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Извлекаем данные
from_addr = self._decode_header(msg.get("From", ""))
subject = self._decode_header(msg.get("Subject", ""))
# Получаем дату
date_str = msg.get("Date", "")
try:
date_tuple = email.utils.parsedate_tz(date_str)
if date_tuple:
timestamp = email.utils.mktime_tz(date_tuple)
dt = datetime.fromtimestamp(timestamp)
else:
dt = datetime.now()
except:
dt = datetime.now()
# Извлекаем тело письма
body = self._extract_body(msg)
first_sender = self._extract_first_sender(body)
# Извлекаем вложения
attachments = self._extract_attachments(msg)
# Создаем объект письма
email_obj = EmailMessage(
from_addr=from_addr,
subj=subject,
dt=dt,
body=body,
attachments=attachments,
first_sender=first_sender
)
emails.append(email_obj)
# Помечаем письмо как прочитанное
if mark_as_read:
self.imap_conn.store(email_id, '+FLAGS', '\\Seen')
except Exception as e:
print(f"Ошибка при обработке письма {email_id}: {e}")
continue
return emails
def send_email(
self,
message: EmailMessage,
to_addr: str,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None
):
"""
Отправить электронное письмо.
Args:
message: Объект EmailMessage для отправки
to_addr: Адрес получателя
cc: Список адресов для копии (необязательно)
bcc: Список адресов для скрытой копии (необязательно)
"""
# Создаем multipart сообщение
msg = MIMEMultipart()
msg['From'] = self.email
msg['To'] = to_addr
msg['Subject'] = message.subj
if cc:
msg['Cc'] = ', '.join(cc)
# Добавляем тело письма
msg.attach(MIMEText(message.body, 'plain', 'utf-8'))
# Добавляем вложения
for attachment in message.attachments:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.content)
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {attachment.filename}'
)
msg.attach(part)
# Формируем список всех получателей
recipients = [to_addr]
if cc:
recipients.extend(cc)
if bcc:
recipients.extend(bcc)
# Отправляем письмо
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.email, self.password)
server.sendmail(self.email, recipients, msg.as_string())
def close(self):
"""Закрыть IMAP соединение"""
if self.imap_conn:
try:
self.imap_conn.close()
self.imap_conn.logout()
except:
pass
self.imap_conn = None
def __enter__(self):
"""Поддержка контекстного менеджера"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Поддержка контекстного менеджера"""
self.close()

View File

@@ -0,0 +1,22 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List
@dataclass
class EmailAttachment:
"""Класс для представления вложения в письме"""
filename: str
content: bytes
@dataclass
class EmailMessage:
"""Класс для представления электронного письма"""
from_addr: str
subj: str
dt: datetime
body: str
attachments: List[EmailAttachment]
first_sender: str = ''

5
src/main.py Normal file
View File

@@ -0,0 +1,5 @@
from config_manager import Configmanager
if __name__=="__main__":
print("Hello, World!")

View File

@@ -0,0 +1,28 @@
import os
from dotenv import load_dotenv
import sys
sys.path.append('./src')
load_dotenv()
from mail_order_bot.email_client import EmailClient
if __name__ == "__main__":
email_client = EmailClient(
imap_host=os.getenv('IMAP_HOST'),
smtp_host=os.getenv('SMTP_HOST'),
email=os.getenv('EMAIL_USER'),
password=os.getenv('EMAIL_PASSWORD'),
imap_port=os.getenv('IMAP_PORT'),
smtp_port=os.getenv('SMTP_PORT')
)
emails = email_client.get_emails(folder='spareparts', only_unseen=True, mark_as_read=True)
for email in emails:
print(email.subj)
print(email.from_addr)
print(email.dt)
print(email.body)
print(email.first_sender)
print('--------------------------------')
email_client.close()