Compare commits

..

3 Commits

Author SHA1 Message Date
6ea9a8e16e gitignore added 2025-10-14 19:55:02 +03:00
5245d4a94d History processor added 2025-10-14 19:54:27 +03:00
ab0de403b6 Правки в orderbook 2025-10-14 19:54:09 +03:00
3 changed files with 4182 additions and 4062 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
pybit.log
.ipynb_checkpoints/HistoryProcessor-checkpoint.ipynb
.ipynb_checkpoints/orderbook-checkpoint.ipynb
data/2025-09-01_BTCUSDT_ob200.data
data/BTCUSDT-2025-09.csv
data/testob.csv
data/testob2.csv

113
HistoryProcessor.ipynb Normal file
View File

@@ -0,0 +1,113 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "7ec67b12-879b-44bc-af1e-1b249e4b7c14",
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"import json\n",
"from datetime import datetime, timedelta\n",
"import time\n",
"\n",
"class HistoryProcessor:\n",
" def __init__(self, csv_path, handler, clickhouse_client, period_seconds):\n",
" self.csv_path = csv_path\n",
" self.handler = handler\n",
" self.db = clickhouse_client\n",
" self.period = period_seconds\n",
"\n",
" def floor_to_period(self, unixts):\n",
" # Найти начало интервала\n",
" start = int(unixts // self.period * self.period)\n",
" return start\n",
"\n",
" def run(self):\n",
" with open(self.csv_path, \"r\") as f:\n",
" reader = csv.reader(f)\n",
" \n",
" current_period = None\n",
" first_cycle = True\n",
" buffer_row = None\n",
"\n",
" for row in reader:\n",
" try:\n",
" message = json.loads(row[0])\n",
" msg_ts = int(message[\"timestamp\"]) # unixtimestamp\n",
" except Exception as e:\n",
" continue # обработка ошибок парсинга\n",
"\n",
" if current_period is None:\n",
" # Ищем первый интервал, синхронизированный на начало минуты\n",
" current_period = self.floor_to_period(msg_ts)\n",
" \n",
" # Разделяем на временные окна, выбрасывая все что до первой \"красивой\" границы\n",
" if first_cycle and (msg_ts < current_period):\n",
" continue # сообщение до первого валидного окна\n",
" first_cycle = False\n",
"\n",
" # Если сообщение попадает в текущее окно\n",
" if msg_ts < current_period + self.period:\n",
" self.handler.on_message(message)\n",
" else:\n",
" # Новый временной интервал\n",
" self.save_to_db(current_period)\n",
"\n",
" # Передвигаем current_period на период вперед до \"захвата\" текущего сообщения\n",
" while msg_ts >= current_period + self.period:\n",
" current_period += self.period\n",
"\n",
" self.handler.reset() # опционально, если нужно очистить состояние между окнами\n",
" self.handler.on_message(message)\n",
"\n",
" # По завершении файла — финальный дамп, если был хоть один обработанный период\n",
" if not first_cycle:\n",
" self.save_to_db(current_period)\n",
"\n",
" def save_to_db(self, period_start):\n",
" orderbook = self.handler.get_orderbook()\n",
" # Пример вставки\n",
" self.db.execute(\n",
" 'INSERT INTO orderbook_table (period_start, data) VALUES',\n",
" [(period_start, json.dumps(orderbook))]\n",
" )\n",
"\n",
"# Пример инициализации:\n",
"# handler = MyOrderbookHandler()\n",
"# clickhouse = ... # Ваш ClickHouse client, например clickhouse-driver\n",
"# proc = HistoryProcessor(\"history.csv\", handler, clickhouse, 15)\n",
"# proc.run()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.6"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}

File diff suppressed because one or more lines are too long