Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
BitgetLimited
GitHub Repository: BitgetLimited/V3-bitget-api-sdk
Path: blob/master/bitget-python-sdk-api/bitget/ws/bitget_ws_client.py
732 views
1
#!/usr/bin/python
2
import json
3
import math
4
import threading
5
import time
6
import traceback
7
from threading import Timer
8
from zlib import crc32
9
10
import websocket
11
12
from bitget.consts import GET
13
from .. import consts as c, utils
14
15
WS_OP_LOGIN = 'login'
16
WS_OP_SUBSCRIBE = "subscribe"
17
WS_OP_UNSUBSCRIBE = "unsubscribe"
18
19
20
def handle(message):
21
print("default:" + message)
22
23
24
def handel_error(message):
25
print("default_error:" + message)
26
27
28
class BitgetWsClient:
29
30
def __init__(self, url, need_login=False):
31
utils.check_none(url, "url")
32
self.__need_login = need_login
33
self.__connection = False
34
self.__login_status = False
35
self.__reconnect_status = False
36
self.__api_key = None
37
self.__api_secret_key = None
38
self.__passphrase = None
39
self.__all_suribe = set()
40
self.__listener = handle
41
self.__error_listener = handel_error
42
self.__url = url
43
self.__scribe_map = {}
44
self.__allbooks_map = {}
45
46
def build(self):
47
self.__ws_client = self.__init_client()
48
__thread = threading.Thread(target=self.connect)
49
__thread.start()
50
51
while not self.has_connect():
52
print("start connecting... url: ", self.__url)
53
time.sleep(1)
54
55
if self.__need_login:
56
self.__login()
57
58
self.__keep_connected(25)
59
60
return self
61
62
def api_key(self, api_key):
63
self.__api_key = api_key
64
return self
65
66
def api_secret_key(self, api_secret_key):
67
self.__api_secret_key = api_secret_key
68
return self
69
70
def passphrase(self, passphrase):
71
self.__passphrase = passphrase
72
return self
73
74
def listener(self, listener):
75
self.__listener = listener
76
return self
77
78
def error_listener(self, error_listener):
79
self.__error_listener = error_listener
80
return self
81
82
def has_connect(self):
83
return self.__connection
84
85
def __init_client(self):
86
try:
87
return websocket.WebSocketApp(self.__url,
88
on_open=self.__on_open,
89
on_message=self.__on_message,
90
on_error=self.__on_error,
91
on_close=self.__on_close)
92
93
except Exception as ex:
94
print(ex)
95
96
def __login(self):
97
utils.check_none(self.__api_key, "api key")
98
utils.check_none(self.__api_secret_key, "api secret key")
99
utils.check_none(self.__passphrase, "passphrase")
100
timestamp = int(round(time.time()))
101
sign = utils.sign(utils.pre_hash(timestamp, GET, c.REQUEST_PATH), self.__api_secret_key)
102
if c.SIGN_TYPE == c.RSA:
103
sign = utils.signByRSA(utils.pre_hash(timestamp, GET, c.REQUEST_PATH), self.__api_secret_key)
104
ws_login_req = WsLoginReq(self.__api_key, self.__passphrase, str(timestamp), sign)
105
self.send_message(WS_OP_LOGIN, [ws_login_req])
106
print("logging in......")
107
while not self.__login_status:
108
time.sleep(1)
109
110
def connect(self):
111
try:
112
self.__ws_client.run_forever(ping_timeout=10)
113
except Exception as ex:
114
print(ex)
115
116
def __keep_connected(self, interval):
117
try:
118
__timer_thread = Timer(interval, self.__keep_connected, (interval,))
119
__timer_thread.start()
120
self.__ws_client.send("ping")
121
except Exception as ex:
122
print(ex)
123
124
def send_message(self, op, args):
125
message = json.dumps(BaseWsReq(op, args), default=lambda o: o.__dict__)
126
print("send message:" + message)
127
self.__ws_client.send(message)
128
129
def subscribe(self, channels, listener=None):
130
131
if listener:
132
for chanel in channels:
133
chanel.inst_type = str(chanel.inst_type)
134
self.__scribe_map[chanel] = listener
135
136
for channel in channels:
137
self.__all_suribe.add(channel)
138
139
self.send_message(WS_OP_SUBSCRIBE, channels)
140
141
def unsubscribe(self, channels):
142
try:
143
for chanel in channels:
144
if chanel in self.__scribe_map:
145
del self.__scribe_map[chanel]
146
147
for channel in channels:
148
if chanel in self.__all_suribe:
149
self.__all_suribe.remove(channel)
150
151
self.send_message(WS_OP_UNSUBSCRIBE, channels)
152
except Exception as e:
153
pass
154
155
def __on_open(self, ws):
156
print('connection is success....')
157
self.__connection = True
158
self.__reconnect_status = False
159
160
def __on_message(self, ws, message):
161
162
if message == 'pong':
163
print("Keep connected:" + message)
164
return
165
json_obj = json.loads(message)
166
if "code" in json_obj and json_obj.get("code") != 0:
167
if self.__error_listener:
168
self.__error_listener(message)
169
return
170
171
if "event" in json_obj and json_obj.get("event") == "login":
172
print("login msg:" + message)
173
self.__login_status = True
174
return
175
listenner = None
176
if "data" in json_obj:
177
if not self.__check_sum(json_obj):
178
return
179
180
listenner = self.get_listener(json_obj)
181
182
if listenner:
183
listenner(message)
184
return
185
186
self.__listener(message)
187
188
def __dict_books_info(self, dict):
189
return BooksInfo(dict['asks'], dict['bids'], dict['checksum'])
190
191
def __dict_to_subscribe_req(self, dict):
192
if "instId" in dict:
193
instId = dict['instId']
194
else:
195
instId = dict['coin']
196
return SubscribeReq(dict['instType'], dict['channel'], instId)
197
198
def get_listener(self, json_obj):
199
try:
200
if json_obj.get('arg'):
201
json_str = str(json_obj.get('arg')).replace("\'", "\"")
202
subscribe_req = json.loads(json_str, object_hook=self.__dict_to_subscribe_req)
203
return self.__scribe_map.get(subscribe_req)
204
except Exception as e:
205
print(json_obj.get('arg'), e)
206
pass
207
208
def __on_error(self, ws, msg):
209
print("error:", msg)
210
self.__close()
211
if not self.__reconnect_status:
212
self.__re_connect()
213
214
def __on_close(self, ws, close_status_code, close_msg):
215
print("ws is closeing ......close_status:{},close_msg:{}".format(close_status_code, close_msg))
216
self.__close()
217
if not self.__reconnect_status:
218
self.__re_connect()
219
220
def __re_connect(self):
221
# 重连
222
self.__reconnect_status = True
223
print("start reconnection ...")
224
self.build()
225
for channel in self.__all_suribe :
226
self.subscribe([channel])
227
pass
228
229
def __close(self):
230
self.__login_status = False
231
self.__connection = False
232
self.__ws_client.close()
233
234
def __check_sum(self, json_obj):
235
# noinspection PyBroadException
236
try:
237
if "arg" not in json_obj or "action" not in json_obj:
238
return True
239
arg = str(json_obj.get('arg')).replace("\'", "\"")
240
action = str(json_obj.get('action')).replace("\'", "\"")
241
data = str(json_obj.get('data')).replace("\'", "\"")
242
243
subscribe_req = json.loads(arg, object_hook=self.__dict_to_subscribe_req)
244
245
if subscribe_req.channel != "books":
246
return True
247
248
books_info = json.loads(data, object_hook=self.__dict_books_info)[0]
249
250
if action == "snapshot":
251
self.__allbooks_map[subscribe_req] = books_info
252
return True
253
if action == "update":
254
all_books = self.__allbooks_map[subscribe_req]
255
if all_books is None:
256
return False
257
258
all_books = all_books.merge(books_info)
259
check_sum = all_books.check_sum(books_info.checksum)
260
if not check_sum:
261
self.unsubscribe([subscribe_req])
262
self.subscribe([subscribe_req])
263
return False
264
self.__allbooks_map[subscribe_req] = all_books
265
except Exception as e:
266
msg = traceback.format_exc()
267
print(msg)
268
269
return True
270
271
272
class BooksInfo:
273
def __init__(self, asks, bids, checksum):
274
self.asks = asks
275
self.bids = bids
276
self.checksum = checksum
277
278
def merge(self, book_info):
279
self.asks = self.innerMerge(self.asks, book_info.asks, False)
280
self.bids = self.innerMerge(self.bids, book_info.bids, True)
281
return self
282
283
def innerMerge(self, all_list, update_list, is_reverse):
284
price_and_value = {}
285
for v in all_list:
286
price_and_value[v[0]] = v
287
288
for v in update_list:
289
if v[1] == "0":
290
del price_and_value[v[0]]
291
continue
292
price_and_value[v[0]] = v
293
294
keys = sorted(price_and_value.keys(), reverse=is_reverse)
295
296
result = []
297
298
for i in keys:
299
result.append(price_and_value[i])
300
301
return result
302
303
def check_sum(self, new_check_sum):
304
crc32str = ''
305
for x in range(25):
306
if self.bids[x] is not None:
307
crc32str = crc32str + self.bids[x][0] + ":" + self.bids[x][1] + ":"
308
309
if self.asks[x] is not None:
310
crc32str = crc32str + self.asks[x][0] + ":" + self.asks[x][1] + ":"
311
312
crc32str = crc32str[0:len(crc32str) - 1]
313
print(crc32str)
314
merge_num = crc32(bytes(crc32str, encoding="utf8"))
315
print("start checknum mergeVal:" + str(merge_num) + ",checkVal:" + str(new_check_sum)+",checkSin:"+str(self.__signed_int(merge_num)))
316
return self.__signed_int(merge_num) == new_check_sum
317
318
def __signed_int(self, checknum):
319
int_max = math.pow(2, 31) - 1
320
if checknum > int_max:
321
return checknum - int_max * 2 - 2
322
return checknum
323
324
class SubscribeReq:
325
326
def __init__(self, inst_type, channel, instId):
327
self.inst_type = inst_type
328
self.channel = channel
329
self.inst_id = instId
330
self.coin = instId
331
332
def __eq__(self, other) -> bool:
333
return self.__dict__ == other.__dict__
334
335
def __hash__(self) -> int:
336
return hash(self.inst_type + self.channel + self.inst_id)
337
338
339
class BaseWsReq:
340
341
def __init__(self, op, args):
342
self.op = op
343
self.args = args
344
345
346
class WsLoginReq:
347
348
def __init__(self, api_key, passphrase, timestamp, sign):
349
self.api_key = api_key
350
self.passphrase = passphrase
351
self.timestamp = timestamp
352
self.sign = sign
353
354