Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bitgetlimited
GitHub Repository: bitgetlimited/v3-bitget-api-sdk
Path: blob/master/bitget-java-sdk-api/src/main/java/com/bitget/openapi/ws/BitgetWsHandle.java
518 views
1
package com.bitget.openapi.ws;
2
3
import com.alibaba.fastjson.JSONArray;
4
import com.alibaba.fastjson.JSONObject;
5
import com.bitget.openapi.common.enums.SignTypeEnum;
6
import com.bitget.openapi.common.utils.DateUtil;
7
import com.bitget.openapi.common.utils.SignatureUtils;
8
import com.bitget.openapi.dto.request.ws.SubscribeReq;
9
import com.bitget.openapi.dto.request.ws.WsBaseReq;
10
import com.bitget.openapi.dto.request.ws.WsLoginReq;
11
import lombok.Data;
12
import okhttp3.*;
13
import org.apache.commons.collections4.CollectionUtils;
14
import org.apache.commons.lang3.StringUtils;
15
import org.apache.commons.lang3.Validate;
16
17
import java.math.BigDecimal;
18
import java.time.Instant;
19
import java.util.*;
20
import java.util.concurrent.ConcurrentHashMap;
21
import java.util.concurrent.Executors;
22
import java.util.concurrent.ScheduledExecutorService;
23
import java.util.concurrent.TimeUnit;
24
import java.util.function.Function;
25
import java.util.function.Predicate;
26
import java.util.stream.Collectors;
27
import java.util.zip.CRC32;
28
29
public class BitgetWsHandle implements BitgetWsClient {
30
public static final String WS_OP_LOGIN = "login";
31
public static final String WS_OP_SUBSCRIBE = "subscribe";
32
public static final String WS_OP_UNSUBSCRIBE = "unsubscribe";
33
34
private WebSocket webSocket;
35
private volatile boolean loginStatus = false;
36
private volatile boolean connectStatus = false;
37
private volatile boolean reconnectStatus = false;
38
39
private BitgetClientBuilder builder;
40
private Map<SubscribeReq, SubscriptionListener> scribeMap = new ConcurrentHashMap<>();
41
private Map<SubscribeReq, BookInfo> allBook = new ConcurrentHashMap<>();
42
43
private Set<SubscribeReq> allSuribe = Collections.synchronizedSet(new HashSet<>());
44
45
private BitgetWsHandle(BitgetClientBuilder builder) {
46
this.builder = builder;
47
webSocket = initClient();
48
}
49
50
private static void printLog(String msg, String type) {
51
System.out.println("[" + DateUtil.getUnixTime() + "] [" + type.toUpperCase() + "] " + msg);
52
}
53
54
private WebSocket initClient() {
55
OkHttpClient client = new OkHttpClient.Builder()
56
.writeTimeout(60, TimeUnit.SECONDS)
57
.readTimeout(60, TimeUnit.SECONDS)
58
.connectTimeout(60, TimeUnit.SECONDS)
59
.build();
60
61
Request request = new Request.Builder()
62
.url(builder.pushUrl)
63
.build();
64
65
webSocket = client.newWebSocket(request, new BitgetWsListener(this));
66
67
if (builder.isLogin) {
68
login();
69
}
70
printLog("start connect ....", "info");
71
while (!connectStatus) {
72
}
73
74
return webSocket;
75
}
76
77
public static BitgetClientBuilder builder() {
78
return new BitgetClientBuilder();
79
}
80
81
@Override
82
public void sendMessage(WsBaseReq<?> req) {
83
printLog("send message:" + JSONObject.toJSONString(req), "info");
84
sendMessage(JSONObject.toJSONString(req));
85
}
86
87
@Override
88
public void sendMessage(String message) {
89
printLog("start send message:" + message, "INFO");
90
webSocket.send(message);
91
}
92
93
@Override
94
public void unsubscribe(List<SubscribeReq> channels) {
95
allSuribe.removeAll(channels);
96
channels.forEach(channel -> {
97
scribeMap.remove(channel);
98
});
99
sendMessage(new WsBaseReq<>(WS_OP_UNSUBSCRIBE, channels));
100
}
101
102
@Override
103
public void subscribe(List<SubscribeReq> channels) {
104
if (CollectionUtils.isNotEmpty(channels)) {
105
for (SubscribeReq subscribeReq : channels) {
106
if (subscribeReq != null && StringUtils.isBlank(subscribeReq.getCoin())) {
107
subscribeReq.setCoin(subscribeReq.getInstId());
108
}
109
}
110
}
111
112
allSuribe.addAll(channels);
113
sendMessage(new WsBaseReq<>(WS_OP_SUBSCRIBE, channels));
114
}
115
116
@Override
117
public void subscribe(List<SubscribeReq> channels, SubscriptionListener listener) {
118
channels.forEach(channel -> {
119
scribeMap.put(channel, listener);
120
});
121
subscribe(channels);
122
}
123
124
@Override
125
public void login() {
126
Validate.notNull(builder.apiKey, "apiKey is null");
127
Validate.notNull(builder.secretKey, "secretKey is null");
128
Validate.notNull(builder.passPhrase, "passphrase is null");
129
130
List<WsLoginReq> args = buildArgs();
131
sendMessage(new WsBaseReq<>(WS_OP_LOGIN, args));
132
//休眠1s,等待登录结果
133
printLog("login in ......", "info");
134
while (!this.loginStatus) {
135
try {
136
Thread.sleep(10000);
137
args = buildArgs();
138
sendMessage(new WsBaseReq<>(WS_OP_LOGIN, args));
139
} catch (Exception e) {
140
e.printStackTrace();
141
}
142
}
143
printLog("login in ......end", "info");
144
}
145
146
private List<WsLoginReq> buildArgs() {
147
String timestamp = Long.valueOf(Instant.now().getEpochSecond()).toString();
148
String sign = sha256_HMAC(timestamp, builder.secretKey);
149
if (SignTypeEnum.RSA == builder.signType) {
150
sign = ws_rsa(timestamp, builder.secretKey);
151
}
152
153
WsLoginReq loginReq = WsLoginReq.builder().apiKey(builder.apiKey).passphrase(builder.passPhrase).timestamp(timestamp).sign(sign).build();
154
155
List<WsLoginReq> args = new ArrayList<WsLoginReq>() {{
156
add(loginReq);
157
}};
158
return args;
159
}
160
161
private void sleep(long s) {
162
try {
163
Thread.sleep(s);
164
} catch (Exception e) {
165
166
}
167
}
168
169
private String sha256_HMAC(String timeStamp, String secret) {
170
String hash = "";
171
try {
172
hash = SignatureUtils.wsGenerateSign(timeStamp, secret);
173
} catch (Exception e) {
174
throw new RuntimeException("sha256_HMAC error", e);
175
}
176
return hash;
177
}
178
179
private String ws_rsa(String timeStamp, String secret) {
180
String hash = "";
181
try {
182
hash = SignatureUtils.wsGenerateRsaSignature(timeStamp, secret);
183
} catch (Exception e) {
184
throw new RuntimeException("sha256_HMAC error", e);
185
}
186
return hash;
187
}
188
189
private final class BitgetWsListener extends WebSocketListener {
190
191
ScheduledExecutorService service;
192
private BitgetWsClient bitgetWsClient;
193
194
public BitgetWsListener(BitgetWsClient bitgetWsClient) {
195
this.bitgetWsClient = bitgetWsClient;
196
}
197
198
@Override
199
public void onOpen(final WebSocket webSocket, final Response response) {
200
connectStatus = true;
201
reconnectStatus = false;
202
//连接成功后,设置定时器,每隔25s,自动向服务器发送心跳,保持与服务器连接
203
Runnable runnable = () -> {
204
// task to run goes here
205
bitgetWsClient.sendMessage("ping");
206
};
207
208
service = Executors.newSingleThreadScheduledExecutor();
209
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
210
service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);
211
}
212
213
@Override
214
public void onClosing(WebSocket webSocket, int code, String reason) {
215
System.out.println("Connection is about to disconnect!");
216
close();
217
if (!reconnectStatus) {
218
reConnect();
219
}
220
221
}
222
223
@Override
224
public void onClosed(final WebSocket webSocket, final int code, final String reason) {
225
System.out.println("Connection dropped!" + reason);
226
close();
227
if (!reconnectStatus) {
228
reConnect();
229
}
230
}
231
232
@Override
233
public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {
234
t.printStackTrace();
235
close();
236
if (!reconnectStatus) {
237
238
reConnect();
239
}
240
}
241
242
// @Override
243
// public void onMessage(final WebSocket webSocket, final ByteString bytes) {
244
// final String s = uncompress(bytes.toByteArray());
245
// onMessage(webSocket,s);
246
// }
247
248
@Override
249
public void onMessage(final WebSocket webSocket, final String message) {
250
try {
251
if (message.equals("pong")) {
252
printLog("Keep connected:" + message, "info");
253
return;
254
}
255
JSONObject jsonObject = JSONObject.parseObject(message);
256
if (jsonObject.containsKey("code") && !jsonObject.get("code").toString().equals("0")) {
257
printLog("code not is 0 msg:" + message, "error");
258
if (Objects.nonNull(builder.errorListener)) {
259
builder.errorListener.onReceive(message);
260
}
261
return;
262
}
263
264
if (jsonObject.containsKey("event") && jsonObject.get("event").equals("login")) {
265
printLog("login msg:" + message, "info");
266
loginStatus = true;
267
return;
268
}
269
SubscriptionListener listener = null;
270
if (jsonObject.containsKey("data")) {
271
listener = getListener(jsonObject);
272
273
//check sum
274
boolean checkSumFlag = checkSum(jsonObject);
275
if (!checkSumFlag) {
276
return;
277
}
278
279
if (Objects.nonNull(listener)) {
280
listener.onReceive(message);
281
return;
282
}
283
if (Objects.nonNull(builder.listener)) {
284
builder.listener.onReceive(message);
285
return;
286
}
287
}
288
printLog("receive op msg:" + message, "info");
289
} catch (Exception e) {
290
printLog("receive error msg:" + message, "error");
291
}
292
}
293
294
private boolean checkSum(JSONObject jsonObject) {
295
try {
296
if (!jsonObject.containsKey("arg") || !jsonObject.containsKey("action")) {
297
return true;
298
}
299
String arg = jsonObject.get("arg").toString();
300
String action = jsonObject.get("action").toString();
301
SubscribeReq subscribeReq = JSONObject.parseObject(arg, SubscribeReq.class);
302
303
if (!StringUtils.equalsIgnoreCase(subscribeReq.getChannel(), "books")) {
304
return true;
305
}
306
JSONArray data = (JSONArray) jsonObject.get("data");
307
BitgetWsHandle.BookInfo bookInfo = JSONObject.parseObject(JSONObject.toJSONString(data.get(0)), BitgetWsHandle.BookInfo.class);
308
309
if (StringUtils.equalsIgnoreCase(action, "snapshot")) {
310
allBook.put(subscribeReq, bookInfo);
311
return true;
312
}
313
if (StringUtils.equalsIgnoreCase(action, "update")) {
314
BookInfo all = allBook.get(subscribeReq);
315
boolean checkNum = all.merge(bookInfo).checkSum(Integer.parseInt(bookInfo.getChecksum()), 25);
316
317
if (!checkNum) {
318
ArrayList<SubscribeReq> subList = new ArrayList<>();
319
subList.add(subscribeReq);
320
this.bitgetWsClient.subscribe(subList);
321
}
322
323
return checkNum;
324
}
325
326
} catch (Exception e) {
327
e.printStackTrace();
328
}
329
330
331
return true;
332
}
333
334
private SubscriptionListener getListener(JSONObject jsonObject) {
335
try {
336
if (jsonObject.containsKey("arg")) {
337
SubscribeReq subscribeReq = JSONObject.parseObject(jsonObject.get("arg").toString(), SubscribeReq.class);
338
return scribeMap.get(subscribeReq);
339
}
340
} catch (Exception e) {
341
342
}
343
return null;
344
345
}
346
347
private void close() {
348
loginStatus = false;
349
connectStatus = false;
350
webSocket.close(1000, "Long time no message was sent or received!");
351
webSocket = null;
352
}
353
354
private void reConnect() {
355
reconnectStatus = true;
356
printLog("start reconnection ...", "info");
357
initClient();
358
if (CollectionUtils.isNotEmpty(allSuribe)) {
359
subscribe(new ArrayList<>(allSuribe));
360
}
361
}
362
363
}
364
365
public static class BitgetClientBuilder {
366
private String pushUrl;
367
private boolean isLogin;
368
private String apiKey;
369
private String secretKey;
370
private String passPhrase;
371
372
private SignTypeEnum signType = SignTypeEnum.SHA256;
373
374
private SubscriptionListener listener;
375
private SubscriptionListener errorListener;
376
377
public BitgetClientBuilder listener(SubscriptionListener listener) {
378
this.listener = listener;
379
return this;
380
}
381
382
public BitgetClientBuilder errorListener(SubscriptionListener errorListener) {
383
this.errorListener = errorListener;
384
return this;
385
}
386
387
public BitgetClientBuilder pushUrl(String pushUrl) {
388
this.pushUrl = pushUrl;
389
return this;
390
}
391
392
public BitgetClientBuilder isLogin(boolean isLogin) {
393
this.isLogin = isLogin;
394
return this;
395
}
396
397
public BitgetClientBuilder apiKey(String apiKey) {
398
this.apiKey = apiKey;
399
return this;
400
}
401
402
public BitgetClientBuilder secretKey(String secretKey) {
403
this.secretKey = secretKey;
404
return this;
405
}
406
407
public BitgetClientBuilder passPhrase(String passPhrase) {
408
this.passPhrase = passPhrase;
409
return this;
410
}
411
412
public BitgetClientBuilder signType(SignTypeEnum signType) {
413
this.signType = signType;
414
return this;
415
}
416
417
public BitgetWsClient build() {
418
return new BitgetWsHandle(this);
419
}
420
421
}
422
423
@Data
424
static class BookInfo {
425
private List<String[]> asks;
426
private List<String[]> bids;
427
private String checksum;
428
private String ts;
429
430
public BookInfo() {
431
}
432
433
public BookInfo merge(BookInfo updateInfo) {
434
this.asks = merge(this.asks, updateInfo.getAsks(), false);
435
printLog("asks sort uniq:" + JSONObject.toJSONString(this.asks), "info");
436
this.bids = merge(this.bids, updateInfo.getBids(), true);
437
printLog("bids sort uniq:" + JSONObject.toJSONString(this.bids), "info");
438
return this;
439
}
440
441
//isReverse: true->desc,false->asc
442
private List<String[]> merge(List<String[]> allList, List<String[]> updateList, boolean isReverse) {
443
Map<String, String[]> priceAndValue = allList.stream().collect(Collectors.toMap(o -> o[0], o -> o));
444
for (String[] update : updateList) {
445
if (new BigDecimal(update[1]).compareTo(BigDecimal.ZERO) == 0) {
446
priceAndValue.remove(update[0]);
447
continue;
448
}
449
priceAndValue.put(update[0], update);
450
451
}
452
453
List<String[]> newAllList = new ArrayList<>(priceAndValue.values());
454
if (isReverse) {
455
newAllList.sort((o1, o2) -> new BigDecimal(o2[0]).compareTo(new BigDecimal(o1[0])));
456
} else {
457
newAllList.sort(Comparator.comparing(o -> new BigDecimal(o[0])));
458
}
459
460
return newAllList;
461
}
462
463
public <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
464
Map<Object, Boolean> map = new ConcurrentHashMap<>();
465
return t -> map.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
466
}
467
468
public boolean checkSum(int checkSum, int gear) {
469
StringBuilder sb = new StringBuilder();
470
for (int i = 0; i < gear; i++) {
471
if (i < this.getBids().size()) {
472
String[] bids = this.getBids().get(i);
473
sb.append(bids[0]).append(":").append(bids[1]).append(":");
474
}
475
476
if (i < this.getAsks().size()) {
477
String[] asks = this.getAsks().get(i);
478
sb.append(asks[0]).append(":").append(asks[1]).append(":");
479
}
480
}
481
String s = sb.toString();
482
String str = s.substring(0, s.length() - 1);
483
484
CRC32 crc32 = new CRC32();
485
crc32.update(str.getBytes());
486
int value = (int) crc32.getValue();
487
printLog("check val:" + str, "info");
488
printLog("start checknum mergeVal:" + value + ",checkVal:" + checkSum, "info");
489
return value == checkSum;
490
}
491
}
492
}
493
494