From 5245d4a94d7590056e070b1464dff11444960f58 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Tue, 14 Oct 2025 19:54:27 +0300 Subject: [PATCH] History processor added --- HistoryProcessor.ipynb | 113 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 HistoryProcessor.ipynb diff --git a/HistoryProcessor.ipynb b/HistoryProcessor.ipynb new file mode 100644 index 0000000..06fc3e3 --- /dev/null +++ b/HistoryProcessor.ipynb @@ -0,0 +1,113 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "7ec67b12-879b-44bc-af1e-1b249e4b7c14", + "metadata": {}, + "outputs": [], + "source": [ + "import csv\n", + "import json\n", + "from datetime import datetime, timedelta\n", + "import time\n", + "\n", + "class HistoryProcessor:\n", + " def __init__(self, csv_path, handler, clickhouse_client, period_seconds):\n", + " self.csv_path = csv_path\n", + " self.handler = handler\n", + " self.db = clickhouse_client\n", + " self.period = period_seconds\n", + "\n", + " def floor_to_period(self, unixts):\n", + " # Найти начало интервала\n", + " start = int(unixts // self.period * self.period)\n", + " return start\n", + "\n", + " def run(self):\n", + " with open(self.csv_path, \"r\") as f:\n", + " reader = csv.reader(f)\n", + " \n", + " current_period = None\n", + " first_cycle = True\n", + " buffer_row = None\n", + "\n", + " for row in reader:\n", + " try:\n", + " message = json.loads(row[0])\n", + " msg_ts = int(message[\"timestamp\"]) # unixtimestamp\n", + " except Exception as e:\n", + " continue # обработка ошибок парсинга\n", + "\n", + " if current_period is None:\n", + " # Ищем первый интервал, синхронизированный на начало минуты\n", + " current_period = self.floor_to_period(msg_ts)\n", + " \n", + " # Разделяем на временные окна, выбрасывая все что до первой \"красивой\" границы\n", + " if first_cycle and (msg_ts < current_period):\n", + " continue # сообщение до первого валидного окна\n", + " first_cycle = False\n", + "\n", + " # Если сообщение попадает в текущее окно\n", + " if msg_ts < current_period + self.period:\n", + " self.handler.on_message(message)\n", + " else:\n", + " # Новый временной интервал\n", + " self.save_to_db(current_period)\n", + "\n", + " # Передвигаем current_period на период вперед до \"захвата\" текущего сообщения\n", + " while msg_ts >= current_period + self.period:\n", + " current_period += self.period\n", + "\n", + " self.handler.reset() # опционально, если нужно очистить состояние между окнами\n", + " self.handler.on_message(message)\n", + "\n", + " # По завершении файла — финальный дамп, если был хоть один обработанный период\n", + " if not first_cycle:\n", + " self.save_to_db(current_period)\n", + "\n", + " def save_to_db(self, period_start):\n", + " orderbook = self.handler.get_orderbook()\n", + " # Пример вставки\n", + " self.db.execute(\n", + " 'INSERT INTO orderbook_table (period_start, data) VALUES',\n", + " [(period_start, json.dumps(orderbook))]\n", + " )\n", + "\n", + "# Пример инициализации:\n", + "# handler = MyOrderbookHandler()\n", + "# clickhouse = ... # Ваш ClickHouse client, например clickhouse-driver\n", + "# proc = HistoryProcessor(\"history.csv\", handler, clickhouse, 15)\n", + "# proc.run()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + }, + "widgets": { + "application/vnd.jupyter.widget-state+json": { + "state": {}, + "version_major": 2, + "version_minor": 0 + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}