Path: blob/master/bitget-java-sdk-api/src/main/java/com/bitget/openapi/ws/BitgetWsHandle.java
518 views
package com.bitget.openapi.ws;12import com.alibaba.fastjson.JSONArray;3import com.alibaba.fastjson.JSONObject;4import com.bitget.openapi.common.enums.SignTypeEnum;5import com.bitget.openapi.common.utils.DateUtil;6import com.bitget.openapi.common.utils.SignatureUtils;7import com.bitget.openapi.dto.request.ws.SubscribeReq;8import com.bitget.openapi.dto.request.ws.WsBaseReq;9import com.bitget.openapi.dto.request.ws.WsLoginReq;10import lombok.Data;11import okhttp3.*;12import org.apache.commons.collections4.CollectionUtils;13import org.apache.commons.lang3.StringUtils;14import org.apache.commons.lang3.Validate;1516import java.math.BigDecimal;17import java.time.Instant;18import java.util.*;19import java.util.concurrent.ConcurrentHashMap;20import java.util.concurrent.Executors;21import java.util.concurrent.ScheduledExecutorService;22import java.util.concurrent.TimeUnit;23import java.util.function.Function;24import java.util.function.Predicate;25import java.util.stream.Collectors;26import java.util.zip.CRC32;2728public class BitgetWsHandle implements BitgetWsClient {29public static final String WS_OP_LOGIN = "login";30public static final String WS_OP_SUBSCRIBE = "subscribe";31public static final String WS_OP_UNSUBSCRIBE = "unsubscribe";3233private WebSocket webSocket;34private volatile boolean loginStatus = false;35private volatile boolean connectStatus = false;36private volatile boolean reconnectStatus = false;3738private BitgetClientBuilder builder;39private Map<SubscribeReq, SubscriptionListener> scribeMap = new ConcurrentHashMap<>();40private Map<SubscribeReq, BookInfo> allBook = new ConcurrentHashMap<>();4142private Set<SubscribeReq> allSuribe = Collections.synchronizedSet(new HashSet<>());4344private BitgetWsHandle(BitgetClientBuilder builder) {45this.builder = builder;46webSocket = initClient();47}4849private static void printLog(String msg, String type) {50System.out.println("[" + DateUtil.getUnixTime() + "] [" + type.toUpperCase() + "] " + msg);51}5253private WebSocket initClient() {54OkHttpClient client = new OkHttpClient.Builder()55.writeTimeout(60, TimeUnit.SECONDS)56.readTimeout(60, TimeUnit.SECONDS)57.connectTimeout(60, TimeUnit.SECONDS)58.build();5960Request request = new Request.Builder()61.url(builder.pushUrl)62.build();6364webSocket = client.newWebSocket(request, new BitgetWsListener(this));6566if (builder.isLogin) {67login();68}69printLog("start connect ....", "info");70while (!connectStatus) {71}7273return webSocket;74}7576public static BitgetClientBuilder builder() {77return new BitgetClientBuilder();78}7980@Override81public void sendMessage(WsBaseReq<?> req) {82printLog("send message:" + JSONObject.toJSONString(req), "info");83sendMessage(JSONObject.toJSONString(req));84}8586@Override87public void sendMessage(String message) {88printLog("start send message:" + message, "INFO");89webSocket.send(message);90}9192@Override93public void unsubscribe(List<SubscribeReq> channels) {94allSuribe.removeAll(channels);95channels.forEach(channel -> {96scribeMap.remove(channel);97});98sendMessage(new WsBaseReq<>(WS_OP_UNSUBSCRIBE, channels));99}100101@Override102public void subscribe(List<SubscribeReq> channels) {103if (CollectionUtils.isNotEmpty(channels)) {104for (SubscribeReq subscribeReq : channels) {105if (subscribeReq != null && StringUtils.isBlank(subscribeReq.getCoin())) {106subscribeReq.setCoin(subscribeReq.getInstId());107}108}109}110111allSuribe.addAll(channels);112sendMessage(new WsBaseReq<>(WS_OP_SUBSCRIBE, channels));113}114115@Override116public void subscribe(List<SubscribeReq> channels, SubscriptionListener listener) {117channels.forEach(channel -> {118scribeMap.put(channel, listener);119});120subscribe(channels);121}122123@Override124public void login() {125Validate.notNull(builder.apiKey, "apiKey is null");126Validate.notNull(builder.secretKey, "secretKey is null");127Validate.notNull(builder.passPhrase, "passphrase is null");128129List<WsLoginReq> args = buildArgs();130sendMessage(new WsBaseReq<>(WS_OP_LOGIN, args));131//休眠1s,等待登录结果132printLog("login in ......", "info");133while (!this.loginStatus) {134try {135Thread.sleep(10000);136args = buildArgs();137sendMessage(new WsBaseReq<>(WS_OP_LOGIN, args));138} catch (Exception e) {139e.printStackTrace();140}141}142printLog("login in ......end", "info");143}144145private List<WsLoginReq> buildArgs() {146String timestamp = Long.valueOf(Instant.now().getEpochSecond()).toString();147String sign = sha256_HMAC(timestamp, builder.secretKey);148if (SignTypeEnum.RSA == builder.signType) {149sign = ws_rsa(timestamp, builder.secretKey);150}151152WsLoginReq loginReq = WsLoginReq.builder().apiKey(builder.apiKey).passphrase(builder.passPhrase).timestamp(timestamp).sign(sign).build();153154List<WsLoginReq> args = new ArrayList<WsLoginReq>() {{155add(loginReq);156}};157return args;158}159160private void sleep(long s) {161try {162Thread.sleep(s);163} catch (Exception e) {164165}166}167168private String sha256_HMAC(String timeStamp, String secret) {169String hash = "";170try {171hash = SignatureUtils.wsGenerateSign(timeStamp, secret);172} catch (Exception e) {173throw new RuntimeException("sha256_HMAC error", e);174}175return hash;176}177178private String ws_rsa(String timeStamp, String secret) {179String hash = "";180try {181hash = SignatureUtils.wsGenerateRsaSignature(timeStamp, secret);182} catch (Exception e) {183throw new RuntimeException("sha256_HMAC error", e);184}185return hash;186}187188private final class BitgetWsListener extends WebSocketListener {189190ScheduledExecutorService service;191private BitgetWsClient bitgetWsClient;192193public BitgetWsListener(BitgetWsClient bitgetWsClient) {194this.bitgetWsClient = bitgetWsClient;195}196197@Override198public void onOpen(final WebSocket webSocket, final Response response) {199connectStatus = true;200reconnectStatus = false;201//连接成功后,设置定时器,每隔25s,自动向服务器发送心跳,保持与服务器连接202Runnable runnable = () -> {203// task to run goes here204bitgetWsClient.sendMessage("ping");205};206207service = Executors.newSingleThreadScheduledExecutor();208// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间209service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);210}211212@Override213public void onClosing(WebSocket webSocket, int code, String reason) {214System.out.println("Connection is about to disconnect!");215close();216if (!reconnectStatus) {217reConnect();218}219220}221222@Override223public void onClosed(final WebSocket webSocket, final int code, final String reason) {224System.out.println("Connection dropped!" + reason);225close();226if (!reconnectStatus) {227reConnect();228}229}230231@Override232public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {233t.printStackTrace();234close();235if (!reconnectStatus) {236237reConnect();238}239}240241// @Override242// public void onMessage(final WebSocket webSocket, final ByteString bytes) {243// final String s = uncompress(bytes.toByteArray());244// onMessage(webSocket,s);245// }246247@Override248public void onMessage(final WebSocket webSocket, final String message) {249try {250if (message.equals("pong")) {251printLog("Keep connected:" + message, "info");252return;253}254JSONObject jsonObject = JSONObject.parseObject(message);255if (jsonObject.containsKey("code") && !jsonObject.get("code").toString().equals("0")) {256printLog("code not is 0 msg:" + message, "error");257if (Objects.nonNull(builder.errorListener)) {258builder.errorListener.onReceive(message);259}260return;261}262263if (jsonObject.containsKey("event") && jsonObject.get("event").equals("login")) {264printLog("login msg:" + message, "info");265loginStatus = true;266return;267}268SubscriptionListener listener = null;269if (jsonObject.containsKey("data")) {270listener = getListener(jsonObject);271272//check sum273boolean checkSumFlag = checkSum(jsonObject);274if (!checkSumFlag) {275return;276}277278if (Objects.nonNull(listener)) {279listener.onReceive(message);280return;281}282if (Objects.nonNull(builder.listener)) {283builder.listener.onReceive(message);284return;285}286}287printLog("receive op msg:" + message, "info");288} catch (Exception e) {289printLog("receive error msg:" + message, "error");290}291}292293private boolean checkSum(JSONObject jsonObject) {294try {295if (!jsonObject.containsKey("arg") || !jsonObject.containsKey("action")) {296return true;297}298String arg = jsonObject.get("arg").toString();299String action = jsonObject.get("action").toString();300SubscribeReq subscribeReq = JSONObject.parseObject(arg, SubscribeReq.class);301302if (!StringUtils.equalsIgnoreCase(subscribeReq.getChannel(), "books")) {303return true;304}305JSONArray data = (JSONArray) jsonObject.get("data");306BitgetWsHandle.BookInfo bookInfo = JSONObject.parseObject(JSONObject.toJSONString(data.get(0)), BitgetWsHandle.BookInfo.class);307308if (StringUtils.equalsIgnoreCase(action, "snapshot")) {309allBook.put(subscribeReq, bookInfo);310return true;311}312if (StringUtils.equalsIgnoreCase(action, "update")) {313BookInfo all = allBook.get(subscribeReq);314boolean checkNum = all.merge(bookInfo).checkSum(Integer.parseInt(bookInfo.getChecksum()), 25);315316if (!checkNum) {317ArrayList<SubscribeReq> subList = new ArrayList<>();318subList.add(subscribeReq);319this.bitgetWsClient.subscribe(subList);320}321322return checkNum;323}324325} catch (Exception e) {326e.printStackTrace();327}328329330return true;331}332333private SubscriptionListener getListener(JSONObject jsonObject) {334try {335if (jsonObject.containsKey("arg")) {336SubscribeReq subscribeReq = JSONObject.parseObject(jsonObject.get("arg").toString(), SubscribeReq.class);337return scribeMap.get(subscribeReq);338}339} catch (Exception e) {340341}342return null;343344}345346private void close() {347loginStatus = false;348connectStatus = false;349webSocket.close(1000, "Long time no message was sent or received!");350webSocket = null;351}352353private void reConnect() {354reconnectStatus = true;355printLog("start reconnection ...", "info");356initClient();357if (CollectionUtils.isNotEmpty(allSuribe)) {358subscribe(new ArrayList<>(allSuribe));359}360}361362}363364public static class BitgetClientBuilder {365private String pushUrl;366private boolean isLogin;367private String apiKey;368private String secretKey;369private String passPhrase;370371private SignTypeEnum signType = SignTypeEnum.SHA256;372373private SubscriptionListener listener;374private SubscriptionListener errorListener;375376public BitgetClientBuilder listener(SubscriptionListener listener) {377this.listener = listener;378return this;379}380381public BitgetClientBuilder errorListener(SubscriptionListener errorListener) {382this.errorListener = errorListener;383return this;384}385386public BitgetClientBuilder pushUrl(String pushUrl) {387this.pushUrl = pushUrl;388return this;389}390391public BitgetClientBuilder isLogin(boolean isLogin) {392this.isLogin = isLogin;393return this;394}395396public BitgetClientBuilder apiKey(String apiKey) {397this.apiKey = apiKey;398return this;399}400401public BitgetClientBuilder secretKey(String secretKey) {402this.secretKey = secretKey;403return this;404}405406public BitgetClientBuilder passPhrase(String passPhrase) {407this.passPhrase = passPhrase;408return this;409}410411public BitgetClientBuilder signType(SignTypeEnum signType) {412this.signType = signType;413return this;414}415416public BitgetWsClient build() {417return new BitgetWsHandle(this);418}419420}421422@Data423static class BookInfo {424private List<String[]> asks;425private List<String[]> bids;426private String checksum;427private String ts;428429public BookInfo() {430}431432public BookInfo merge(BookInfo updateInfo) {433this.asks = merge(this.asks, updateInfo.getAsks(), false);434printLog("asks sort uniq:" + JSONObject.toJSONString(this.asks), "info");435this.bids = merge(this.bids, updateInfo.getBids(), true);436printLog("bids sort uniq:" + JSONObject.toJSONString(this.bids), "info");437return this;438}439440//isReverse: true->desc,false->asc441private List<String[]> merge(List<String[]> allList, List<String[]> updateList, boolean isReverse) {442Map<String, String[]> priceAndValue = allList.stream().collect(Collectors.toMap(o -> o[0], o -> o));443for (String[] update : updateList) {444if (new BigDecimal(update[1]).compareTo(BigDecimal.ZERO) == 0) {445priceAndValue.remove(update[0]);446continue;447}448priceAndValue.put(update[0], update);449450}451452List<String[]> newAllList = new ArrayList<>(priceAndValue.values());453if (isReverse) {454newAllList.sort((o1, o2) -> new BigDecimal(o2[0]).compareTo(new BigDecimal(o1[0])));455} else {456newAllList.sort(Comparator.comparing(o -> new BigDecimal(o[0])));457}458459return newAllList;460}461462public <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {463Map<Object, Boolean> map = new ConcurrentHashMap<>();464return t -> map.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;465}466467public boolean checkSum(int checkSum, int gear) {468StringBuilder sb = new StringBuilder();469for (int i = 0; i < gear; i++) {470if (i < this.getBids().size()) {471String[] bids = this.getBids().get(i);472sb.append(bids[0]).append(":").append(bids[1]).append(":");473}474475if (i < this.getAsks().size()) {476String[] asks = this.getAsks().get(i);477sb.append(asks[0]).append(":").append(asks[1]).append(":");478}479}480String s = sb.toString();481String str = s.substring(0, s.length() - 1);482483CRC32 crc32 = new CRC32();484crc32.update(str.getBytes());485int value = (int) crc32.getValue();486printLog("check val:" + str, "info");487printLog("start checknum mergeVal:" + value + ",checkVal:" + checkSum, "info");488return value == checkSum;489}490}491}492493494