Path: blob/master/bitget-golang-sdk-api/internal/common/bitgetwsclient.go
735 views
package common12import (3"bitget/config"4"bitget/constants"5"bitget/internal"6"bitget/internal/model"7"bitget/logging/applogger"8"fmt"9"github.com/gorilla/websocket"10"github.com/robfig/cron"11"sync"12"time"13)1415type BitgetBaseWsClient struct {16NeedLogin bool17Connection bool18LoginStatus bool19Listener OnReceive20ErrorListener OnReceive21Ticker *time.Ticker22SendMutex *sync.Mutex23WebSocketClient *websocket.Conn24LastReceivedTime time.Time25AllSuribe *model.Set26Signer *Signer27ScribeMap map[model.SubscribeReq]OnReceive28}2930func (p *BitgetBaseWsClient) Init() *BitgetBaseWsClient {31p.Connection = false32p.AllSuribe = model.NewSet()33p.Signer = new(Signer).Init(config.SecretKey)34p.ScribeMap = make(map[model.SubscribeReq]OnReceive)35p.SendMutex = &sync.Mutex{}36p.Ticker = time.NewTicker(constants.TimerIntervalSecond * time.Second)37p.LastReceivedTime = time.Now()3839return p40}4142func (p *BitgetBaseWsClient) SetListener(msgListener OnReceive, errorListener OnReceive) {43p.Listener = msgListener44p.ErrorListener = errorListener45}4647func (p *BitgetBaseWsClient) Connect() {4849p.tickerLoop()50p.ExecuterPing()51}5253func (p *BitgetBaseWsClient) ConnectWebSocket() {54var err error55applogger.Info("WebSocket connecting...")56p.WebSocketClient, _, err = websocket.DefaultDialer.Dial(config.WsUrl, nil)57if err != nil {58fmt.Printf("WebSocket connected error: %s\n", err)59return60}61applogger.Info("WebSocket connected")62p.Connection = true63}6465func (p *BitgetBaseWsClient) Login() {66timesStamp := internal.TimesStampSec()67sign := p.Signer.Sign(constants.WsAuthMethod, constants.WsAuthPath, "", timesStamp)68if constants.RSA == config.SignType {69sign = p.Signer.SignByRSA(constants.WsAuthMethod, constants.WsAuthPath, "", timesStamp)70}7172loginReq := model.WsLoginReq{73ApiKey: config.ApiKey,74Passphrase: config.PASSPHRASE,75Timestamp: timesStamp,76Sign: sign,77}78var args []interface{}79args = append(args, loginReq)8081baseReq := model.WsBaseReq{82Op: constants.WsOpLogin,83Args: args,84}85p.SendByType(baseReq)86}8788func (p *BitgetBaseWsClient) StartReadLoop() {89go p.ReadLoop()90}9192func (p *BitgetBaseWsClient) ExecuterPing() {93c := cron.New()94_ = c.AddFunc("*/15 * * * * *", p.ping)95c.Start()96}97func (p *BitgetBaseWsClient) ping() {98p.Send("ping")99}100101func (p *BitgetBaseWsClient) SendByType(req model.WsBaseReq) {102json, _ := internal.ToJson(req)103p.Send(json)104}105106func (p *BitgetBaseWsClient) Send(data string) {107if p.WebSocketClient == nil {108applogger.Error("WebSocket sent error: no connection available")109return110}111applogger.Info("sendMessage:%s", data)112p.SendMutex.Lock()113err := p.WebSocketClient.WriteMessage(websocket.TextMessage, []byte(data))114p.SendMutex.Unlock()115if err != nil {116applogger.Error("WebSocket sent error: data=%s, error=%s", data, err)117}118}119120func (p *BitgetBaseWsClient) tickerLoop() {121applogger.Info("tickerLoop started")122for {123select {124case <-p.Ticker.C:125elapsedSecond := time.Now().Sub(p.LastReceivedTime).Seconds()126127if elapsedSecond > constants.ReconnectWaitSecond {128applogger.Info("WebSocket reconnect...")129p.disconnectWebSocket()130p.ConnectWebSocket()131}132}133}134}135136func (p *BitgetBaseWsClient) disconnectWebSocket() {137if p.WebSocketClient == nil {138return139}140141fmt.Println("WebSocket disconnecting...")142err := p.WebSocketClient.Close()143if err != nil {144applogger.Error("WebSocket disconnect error: %s\n", err)145return146}147148applogger.Info("WebSocket disconnected")149}150151func (p *BitgetBaseWsClient) ReadLoop() {152for {153154if p.WebSocketClient == nil {155applogger.Info("Read error: no connection available")156//time.Sleep(TimerIntervalSecond * time.Second)157continue158}159160_, buf, err := p.WebSocketClient.ReadMessage()161if err != nil {162applogger.Info("Read error: %s", err)163continue164}165p.LastReceivedTime = time.Now()166message := string(buf)167168applogger.Info("rev:" + message)169170if message == "pong" {171applogger.Info("Keep connected:" + message)172continue173}174jsonMap := internal.JSONToMap(message)175176v, e := jsonMap["code"]177178if e && int(v.(float64)) != 0 {179p.ErrorListener(message)180continue181}182183v, e = jsonMap["event"]184if e && v == "login" {185applogger.Info("login msg:" + message)186p.LoginStatus = true187continue188}189190v, e = jsonMap["data"]191if e {192listener := p.GetListener(jsonMap["arg"])193listener(message)194continue195}196p.handleMessage(message)197}198199}200201func (p *BitgetBaseWsClient) GetListener(argJson interface{}) OnReceive {202203mapData := argJson.(map[string]interface{})204205subscribeReq := model.SubscribeReq{206InstType: fmt.Sprintf("%v", mapData["instType"]),207Channel: fmt.Sprintf("%v", mapData["channel"]),208InstId: fmt.Sprintf("%v", mapData["instId"]),209}210211v, e := p.ScribeMap[subscribeReq]212213if !e {214return p.Listener215}216return v217}218219type OnReceive func(message string)220221func (p *BitgetBaseWsClient) handleMessage(msg string) {222fmt.Println("default:" + msg)223}224225226