{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "7ec67b12-879b-44bc-af1e-1b249e4b7c14", "metadata": {}, "outputs": [], "source": [ "import csv\n", "import json\n", "from datetime import datetime, timedelta\n", "import time\n", "\n", "class HistoryProcessor:\n", " def __init__(self, csv_path, handler, clickhouse_client, period_seconds):\n", " self.csv_path = csv_path\n", " self.handler = handler\n", " self.db = clickhouse_client\n", " self.period = period_seconds\n", "\n", " def floor_to_period(self, unixts):\n", " # Найти начало интервала\n", " start = int(unixts // self.period * self.period)\n", " return start\n", "\n", " def run(self):\n", " with open(self.csv_path, \"r\") as f:\n", " reader = csv.reader(f)\n", " \n", " current_period = None\n", " first_cycle = True\n", " buffer_row = None\n", "\n", " for row in reader:\n", " try:\n", " message = json.loads(row[0])\n", " msg_ts = int(message[\"timestamp\"]) # unixtimestamp\n", " except Exception as e:\n", " continue # обработка ошибок парсинга\n", "\n", " if current_period is None:\n", " # Ищем первый интервал, синхронизированный на начало минуты\n", " current_period = self.floor_to_period(msg_ts)\n", " \n", " # Разделяем на временные окна, выбрасывая все что до первой \"красивой\" границы\n", " if first_cycle and (msg_ts < current_period):\n", " continue # сообщение до первого валидного окна\n", " first_cycle = False\n", "\n", " # Если сообщение попадает в текущее окно\n", " if msg_ts < current_period + self.period:\n", " self.handler.on_message(message)\n", " else:\n", " # Новый временной интервал\n", " self.save_to_db(current_period)\n", "\n", " # Передвигаем current_period на период вперед до \"захвата\" текущего сообщения\n", " while msg_ts >= current_period + self.period:\n", " current_period += self.period\n", "\n", " self.handler.reset() # опционально, если нужно очистить состояние между окнами\n", " self.handler.on_message(message)\n", "\n", " # По завершении файла — финальный дамп, если был хоть один обработанный период\n", " if not first_cycle:\n", " self.save_to_db(current_period)\n", "\n", " def save_to_db(self, period_start):\n", " orderbook = self.handler.get_orderbook()\n", " # Пример вставки\n", " self.db.execute(\n", " 'INSERT INTO orderbook_table (period_start, data) VALUES',\n", " [(period_start, json.dumps(orderbook))]\n", " )\n", "\n", "# Пример инициализации:\n", "# handler = MyOrderbookHandler()\n", "# clickhouse = ... # Ваш ClickHouse client, например clickhouse-driver\n", "# proc = HistoryProcessor(\"history.csv\", handler, clickhouse, 15)\n", "# proc.run()\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }