Path: blob/master/ invest-robot-contest_Tinkoff-stream-grid-bot-main/tinkoff-stream-grid-bot.py
5925 views
import sqlite31import time2import sys3import json4import logging5from tinkoff.invest import Client, RequestError6from tinkoff.invest import OrderType, OrderDirection, Quotation, OrderExecutionReportStatus78#for token import9import os1011#Token import to variables12config_file = "config.json"1314if os.path.isfile(config_file):15with open(file=config_file) as config:16config = json.load(config)17TOKEN = config.get('token')18ACCID = config.get('account_id')19shares = config.get('shares')20else:21print ("No " + config_file + " exists.")22exit(0)2324sqlite = sqlite3.connect('sqlite_brand_new_stream2.db')25cursor = sqlite.cursor()2627logger = open('bot_stat.log', 'a')282930def log(*args):31message = ' '.join(map(str, args))32print(message)33logger.write(message + "\n")34logger.flush()3536def makefloat(m) -> float:37return float(format(m.units + m.nano / 10 ** 9, '.9f'))3839def makequotation(m) -> Quotation:40return Quotation(*[int(a) for a in format(m, '.9f').split('.')])4142def main() -> int:43sql = '''CREATE TABLE IF NOT EXISTS `share` (44id INTEGER PRIMARY KEY AUTOINCREMENT,45ticker VARCHAR(50) NOT NULL UNIQUE,46figi VARCHAR(50)47);'''48cursor.execute(sql)49sqlite.commit()5051sql = '''CREATE TABLE IF NOT EXISTS `price` (52id INTEGER PRIMARY KEY AUTOINCREMENT,53share_id INTEGER NOT NULL,54ticker VARCHAR(50) NOT NULL,55price DECIMAL(20,10) NOT NULL56);'''57cursor.execute(sql)58sqlite.commit()5960sql = '''CREATE TABLE IF NOT EXISTS `order` (61id INTEGER PRIMARY KEY AUTOINCREMENT,62share_id INTEGER NOT NULL,63ticker VARCHAR(50) NOT NULL,64order_id VARCHAR(64) UNIQUE,65price DECIMAL(20,10) NOT NULL,66direction INTEGER67);68'''69cursor.execute(sql)70sqlite.commit()7172with Client(token=TOKEN, app_name="nazmiev.Tinkoff-stream-grid-bot") as client:73for instrument in client.instruments.shares().instruments:74if (instrument.ticker in shares):75share = shares[instrument.ticker]76share['ticker'] = instrument.ticker77share['figi'] = instrument.figi78share['min_price_step'] = makefloat(instrument.min_price_increment)7980cursor.execute("INSERT OR IGNORE INTO `share` (ticker, figi) VALUES (?, ?)", (instrument.ticker, instrument.figi))81sqlite.commit()8283share['id'] = cursor.lastrowid if cursor.lastrowid else cursor.execute("SELECT id FROM `share` WHERE ticker = ?", (instrument.ticker, )).fetchone()[0]8485for ticker, share in shares.items():86last_price = client.market_data.get_last_prices(figi=[share['figi']]).last_prices[0].price87last_price = makefloat(last_price)88print(ticker, share['figi'], "last price", last_price, "min step", share['min_price_step'])8990if share['start_price'] == 0:91share['start_price'] = last_price9293# достать всё из базы. Если нет, то заполнить94prices = get_prices(share)95if prices:96continue9798# удаляем все открытые ордера99client.cancel_all_orders(account_id=ACCID)100101# заполняем цены102for i in range(share['number'] + 1):103price = share['start_price'] + share['start_price'] * share['price_step'] * (i - share['number'] / 2)104price = round(price / share['min_price_step']) * share['min_price_step']105price = float(format(price, '.9f'))106107cursor.execute("INSERT INTO `price` (share_id, ticker, price) VALUES (?, ?, ?)", (share['id'], ticker, price))108sqlite.commit()109110111# по ценам выставляем ордера112for share in shares.values():113create_orders(client, share)114115# засечь время116orders_time = time.time()117118while True:119try:120for response in client.orders_stream.trades_stream([ACCID]):121print(time.asctime(), response)122123if not response.order_trades:124print(time.asctime(), "not response.order_trades")125# если в базе нет ордеров и прошла минута пытаться перевыставить ордера126if time.time() - orders_time >= 1 * 60:127for share in shares.values():128create_orders(client, share)129orders_time = time.time()130continue131132order_id = response.order_trades.order_id133handle_order(client, order_id)134orders_time = time.time()135except RequestError as err:136# неведомая ошибка137print(time.asctime(), "неведомая ошибка", err)138139return 0140141def create_orders(client, share):142share_id = share['id']143figi = share['figi']144quantity = share['quantity']145146prices = get_prices(share)147148last_price = client.market_data.get_last_prices(figi=[figi]).last_prices[0].price149last_price = makefloat(last_price)150skip_price = 0151for price in prices:152if abs(price - last_price) < abs(price - skip_price):153skip_price = price154155#пробуем найти цену под sell156new_skip_price = cursor.execute(157"SELECT MAX(price) FROM price P WHERE share_id = ? AND price < (SELECT MIN(price) FROM `order` WHERE direction = ? AND share_id = P.share_id)",158(share_id, OrderDirection.ORDER_DIRECTION_SELL)).fetchone()[0]159print(time.asctime(), 'new_skip_price sell', new_skip_price)160161if not new_skip_price:162# пробуем найти цену над buy163new_skip_price = cursor.execute(164"SELECT MIN(price) FROM price P WHERE share_id = ? AND price > (SELECT MAX(price) FROM `order` WHERE direction = ? AND share_id = P.share_id)",165(share_id, OrderDirection.ORDER_DIRECTION_BUY)).fetchone()[0]166print(time.asctime(), 'new_skip_price buy', new_skip_price)167168if new_skip_price:169print(time.asctime(), 'new_skip_price', new_skip_price)170skip_price = new_skip_price171172print(time.asctime(), 'last', last_price, 'skip', skip_price)173174# пробуем выставлять ордера от текущей цены вверх и вниз, чтобы сначала пытался выставить ближайшие ордера, которые скорей всего сработают175# sorted(prices, key=lambda x: abs(x - skip_price))176177for price in prices:178# достать ордер из базы179order = cursor.execute("SELECT order_id FROM `order` WHERE `share_id` = ? AND `price` = ?", (share_id, price)).fetchone()180if order:181handle_order(client, order[0])182continue183184if price == skip_price:185continue186187# create order188direction = OrderDirection.ORDER_DIRECTION_SELL if price > last_price else OrderDirection.ORDER_DIRECTION_BUY189price_quotation = makequotation(price)190try:191order = client.orders.post_order(account_id=ACCID, figi=figi, quantity=quantity,192price=price_quotation, direction=direction,193order_type=OrderType.ORDER_TYPE_LIMIT,194order_id=str(time.time_ns()))195# print('post_order', ACCID, figi, quantity, price_quotation, direction)196# order = type('obj', (object,), { 'order_id': str(time.time_ns()) })197198cursor.execute("INSERT INTO `order` (share_id, ticker, order_id, price, direction) VALUES (?, ?, ?, ?, ?)",199[share_id, share['ticker'], order.order_id, price, direction])200sqlite.commit()201print(time.asctime(), "Выставил ордер: ", share['ticker'], price, direction)202except RequestError as err:203# попробовать поймать ошибку что нет торгов сейчас, например 30079. Все ошибки тут: https://tinkoff.github.io/investAPI/errors/204print(time.asctime(), "Ошибка при выставлении ордера: ", share['ticker'], price, direction, err.metadata.message)205if err.details == '30079':206return207208def handle_order(client, order_id):209order = cursor.execute("SELECT id, share_id, price FROM `order` WHERE `order_id` = ?", (order_id, )).fetchone()210if not order:211print(time.asctime(), "not order: ", order)212return213id = order[0]214share_id = order[1]215price = order[2]216217share = None218for tmp in shares.values():219if tmp['id'] == share_id:220share = tmp221break222if not share:223print("share not found")224return225figi = share['figi']226quantity = share['quantity']227228order_state = client.orders.get_order_state(account_id=ACCID, order_id=order_id)229if not order_state:230print(time.asctime(), "not order_state: ", order)231return232print(time.asctime(), order_state.execution_report_status)233234bad_statuses = [235OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_UNSPECIFIED,236OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_REJECTED,237OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_CANCELLED238]239240if (order_state.execution_report_status in bad_statuses):241print(time.asctime(), " ORDER ", order_state.execution_report_status, order_id, price)242cursor.execute("DELETE FROM `order` WHERE id = ?", (id,))243sqlite.commit()244return245246if (order_state.execution_report_status == OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_FILL):247print(time.asctime(), " ORDER FILLED ", order_id, price)248log(time.asctime(), " ORDER FILLED ", order_id, price, order_state.direction)249250prices = get_prices(share)251252direction = order_state.direction253price_index = prices.index(price)254255if (direction == OrderDirection.ORDER_DIRECTION_SELL):256direction = OrderDirection.ORDER_DIRECTION_BUY257price_index -= 1258else:259direction = OrderDirection.ORDER_DIRECTION_SELL260price_index += 1261262price = prices[price_index]263price_quotation = makequotation(price)264265try:266order = client.orders.post_order(account_id=ACCID, figi=figi, quantity=quantity,267price=price_quotation, direction=direction,268order_type=OrderType.ORDER_TYPE_LIMIT,269order_id=str(time.time_ns()))270# print('post_order', ACCID, figi, quantity, price_quotation, direction)271# order = type('obj', (object,), {'order_id': str(time.time_ns())})272273cursor.execute("UPDATE `order` SET order_id = ?, price = ?, direction = ? WHERE id = ?",274[order.order_id, price, direction, id])275sqlite.commit()276print(time.asctime(), " NEW ORDER ", order.order_id, direction, price)277except RequestError as err:278print(time.asctime(), " ERROR ", err.metadata.message)279280def get_prices(share):281cursor.execute("SELECT price FROM `price` WHERE share_id = ? ORDER BY price", (share['id'], ))282return [row[0] for row in cursor.fetchall()]283284if __name__ == "__main__":285sys.exit(main())286287