Path: blob/master/bitget-python-sdk-api/bitget/ws/bitget_ws_client.py
732 views
#!/usr/bin/python1import json2import math3import threading4import time5import traceback6from threading import Timer7from zlib import crc3289import websocket1011from bitget.consts import GET12from .. import consts as c, utils1314WS_OP_LOGIN = 'login'15WS_OP_SUBSCRIBE = "subscribe"16WS_OP_UNSUBSCRIBE = "unsubscribe"171819def handle(message):20print("default:" + message)212223def handel_error(message):24print("default_error:" + message)252627class BitgetWsClient:2829def __init__(self, url, need_login=False):30utils.check_none(url, "url")31self.__need_login = need_login32self.__connection = False33self.__login_status = False34self.__reconnect_status = False35self.__api_key = None36self.__api_secret_key = None37self.__passphrase = None38self.__all_suribe = set()39self.__listener = handle40self.__error_listener = handel_error41self.__url = url42self.__scribe_map = {}43self.__allbooks_map = {}4445def build(self):46self.__ws_client = self.__init_client()47__thread = threading.Thread(target=self.connect)48__thread.start()4950while not self.has_connect():51print("start connecting... url: ", self.__url)52time.sleep(1)5354if self.__need_login:55self.__login()5657self.__keep_connected(25)5859return self6061def api_key(self, api_key):62self.__api_key = api_key63return self6465def api_secret_key(self, api_secret_key):66self.__api_secret_key = api_secret_key67return self6869def passphrase(self, passphrase):70self.__passphrase = passphrase71return self7273def listener(self, listener):74self.__listener = listener75return self7677def error_listener(self, error_listener):78self.__error_listener = error_listener79return self8081def has_connect(self):82return self.__connection8384def __init_client(self):85try:86return websocket.WebSocketApp(self.__url,87on_open=self.__on_open,88on_message=self.__on_message,89on_error=self.__on_error,90on_close=self.__on_close)9192except Exception as ex:93print(ex)9495def __login(self):96utils.check_none(self.__api_key, "api key")97utils.check_none(self.__api_secret_key, "api secret key")98utils.check_none(self.__passphrase, "passphrase")99timestamp = int(round(time.time()))100sign = utils.sign(utils.pre_hash(timestamp, GET, c.REQUEST_PATH), self.__api_secret_key)101if c.SIGN_TYPE == c.RSA:102sign = utils.signByRSA(utils.pre_hash(timestamp, GET, c.REQUEST_PATH), self.__api_secret_key)103ws_login_req = WsLoginReq(self.__api_key, self.__passphrase, str(timestamp), sign)104self.send_message(WS_OP_LOGIN, [ws_login_req])105print("logging in......")106while not self.__login_status:107time.sleep(1)108109def connect(self):110try:111self.__ws_client.run_forever(ping_timeout=10)112except Exception as ex:113print(ex)114115def __keep_connected(self, interval):116try:117__timer_thread = Timer(interval, self.__keep_connected, (interval,))118__timer_thread.start()119self.__ws_client.send("ping")120except Exception as ex:121print(ex)122123def send_message(self, op, args):124message = json.dumps(BaseWsReq(op, args), default=lambda o: o.__dict__)125print("send message:" + message)126self.__ws_client.send(message)127128def subscribe(self, channels, listener=None):129130if listener:131for chanel in channels:132chanel.inst_type = str(chanel.inst_type)133self.__scribe_map[chanel] = listener134135for channel in channels:136self.__all_suribe.add(channel)137138self.send_message(WS_OP_SUBSCRIBE, channels)139140def unsubscribe(self, channels):141try:142for chanel in channels:143if chanel in self.__scribe_map:144del self.__scribe_map[chanel]145146for channel in channels:147if chanel in self.__all_suribe:148self.__all_suribe.remove(channel)149150self.send_message(WS_OP_UNSUBSCRIBE, channels)151except Exception as e:152pass153154def __on_open(self, ws):155print('connection is success....')156self.__connection = True157self.__reconnect_status = False158159def __on_message(self, ws, message):160161if message == 'pong':162print("Keep connected:" + message)163return164json_obj = json.loads(message)165if "code" in json_obj and json_obj.get("code") != 0:166if self.__error_listener:167self.__error_listener(message)168return169170if "event" in json_obj and json_obj.get("event") == "login":171print("login msg:" + message)172self.__login_status = True173return174listenner = None175if "data" in json_obj:176if not self.__check_sum(json_obj):177return178179listenner = self.get_listener(json_obj)180181if listenner:182listenner(message)183return184185self.__listener(message)186187def __dict_books_info(self, dict):188return BooksInfo(dict['asks'], dict['bids'], dict['checksum'])189190def __dict_to_subscribe_req(self, dict):191if "instId" in dict:192instId = dict['instId']193else:194instId = dict['coin']195return SubscribeReq(dict['instType'], dict['channel'], instId)196197def get_listener(self, json_obj):198try:199if json_obj.get('arg'):200json_str = str(json_obj.get('arg')).replace("\'", "\"")201subscribe_req = json.loads(json_str, object_hook=self.__dict_to_subscribe_req)202return self.__scribe_map.get(subscribe_req)203except Exception as e:204print(json_obj.get('arg'), e)205pass206207def __on_error(self, ws, msg):208print("error:", msg)209self.__close()210if not self.__reconnect_status:211self.__re_connect()212213def __on_close(self, ws, close_status_code, close_msg):214print("ws is closeing ......close_status:{},close_msg:{}".format(close_status_code, close_msg))215self.__close()216if not self.__reconnect_status:217self.__re_connect()218219def __re_connect(self):220# 重连221self.__reconnect_status = True222print("start reconnection ...")223self.build()224for channel in self.__all_suribe :225self.subscribe([channel])226pass227228def __close(self):229self.__login_status = False230self.__connection = False231self.__ws_client.close()232233def __check_sum(self, json_obj):234# noinspection PyBroadException235try:236if "arg" not in json_obj or "action" not in json_obj:237return True238arg = str(json_obj.get('arg')).replace("\'", "\"")239action = str(json_obj.get('action')).replace("\'", "\"")240data = str(json_obj.get('data')).replace("\'", "\"")241242subscribe_req = json.loads(arg, object_hook=self.__dict_to_subscribe_req)243244if subscribe_req.channel != "books":245return True246247books_info = json.loads(data, object_hook=self.__dict_books_info)[0]248249if action == "snapshot":250self.__allbooks_map[subscribe_req] = books_info251return True252if action == "update":253all_books = self.__allbooks_map[subscribe_req]254if all_books is None:255return False256257all_books = all_books.merge(books_info)258check_sum = all_books.check_sum(books_info.checksum)259if not check_sum:260self.unsubscribe([subscribe_req])261self.subscribe([subscribe_req])262return False263self.__allbooks_map[subscribe_req] = all_books264except Exception as e:265msg = traceback.format_exc()266print(msg)267268return True269270271class BooksInfo:272def __init__(self, asks, bids, checksum):273self.asks = asks274self.bids = bids275self.checksum = checksum276277def merge(self, book_info):278self.asks = self.innerMerge(self.asks, book_info.asks, False)279self.bids = self.innerMerge(self.bids, book_info.bids, True)280return self281282def innerMerge(self, all_list, update_list, is_reverse):283price_and_value = {}284for v in all_list:285price_and_value[v[0]] = v286287for v in update_list:288if v[1] == "0":289del price_and_value[v[0]]290continue291price_and_value[v[0]] = v292293keys = sorted(price_and_value.keys(), reverse=is_reverse)294295result = []296297for i in keys:298result.append(price_and_value[i])299300return result301302def check_sum(self, new_check_sum):303crc32str = ''304for x in range(25):305if self.bids[x] is not None:306crc32str = crc32str + self.bids[x][0] + ":" + self.bids[x][1] + ":"307308if self.asks[x] is not None:309crc32str = crc32str + self.asks[x][0] + ":" + self.asks[x][1] + ":"310311crc32str = crc32str[0:len(crc32str) - 1]312print(crc32str)313merge_num = crc32(bytes(crc32str, encoding="utf8"))314print("start checknum mergeVal:" + str(merge_num) + ",checkVal:" + str(new_check_sum)+",checkSin:"+str(self.__signed_int(merge_num)))315return self.__signed_int(merge_num) == new_check_sum316317def __signed_int(self, checknum):318int_max = math.pow(2, 31) - 1319if checknum > int_max:320return checknum - int_max * 2 - 2321return checknum322323class SubscribeReq:324325def __init__(self, inst_type, channel, instId):326self.inst_type = inst_type327self.channel = channel328self.inst_id = instId329self.coin = instId330331def __eq__(self, other) -> bool:332return self.__dict__ == other.__dict__333334def __hash__(self) -> int:335return hash(self.inst_type + self.channel + self.inst_id)336337338class BaseWsReq:339340def __init__(self, op, args):341self.op = op342self.args = args343344345class WsLoginReq:346347def __init__(self, api_key, passphrase, timestamp, sign):348self.api_key = api_key349self.passphrase = passphrase350self.timestamp = timestamp351self.sign = sign352353354