Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
BitgetLimited
GitHub Repository: BitgetLimited/V3-bitget-api-sdk
Path: blob/master/bitget-golang-sdk-api/internal/common/bitgetwsclient.go
735 views
1
package common
2
3
import (
4
"bitget/config"
5
"bitget/constants"
6
"bitget/internal"
7
"bitget/internal/model"
8
"bitget/logging/applogger"
9
"fmt"
10
"github.com/gorilla/websocket"
11
"github.com/robfig/cron"
12
"sync"
13
"time"
14
)
15
16
type BitgetBaseWsClient struct {
17
NeedLogin bool
18
Connection bool
19
LoginStatus bool
20
Listener OnReceive
21
ErrorListener OnReceive
22
Ticker *time.Ticker
23
SendMutex *sync.Mutex
24
WebSocketClient *websocket.Conn
25
LastReceivedTime time.Time
26
AllSuribe *model.Set
27
Signer *Signer
28
ScribeMap map[model.SubscribeReq]OnReceive
29
}
30
31
func (p *BitgetBaseWsClient) Init() *BitgetBaseWsClient {
32
p.Connection = false
33
p.AllSuribe = model.NewSet()
34
p.Signer = new(Signer).Init(config.SecretKey)
35
p.ScribeMap = make(map[model.SubscribeReq]OnReceive)
36
p.SendMutex = &sync.Mutex{}
37
p.Ticker = time.NewTicker(constants.TimerIntervalSecond * time.Second)
38
p.LastReceivedTime = time.Now()
39
40
return p
41
}
42
43
func (p *BitgetBaseWsClient) SetListener(msgListener OnReceive, errorListener OnReceive) {
44
p.Listener = msgListener
45
p.ErrorListener = errorListener
46
}
47
48
func (p *BitgetBaseWsClient) Connect() {
49
50
p.tickerLoop()
51
p.ExecuterPing()
52
}
53
54
func (p *BitgetBaseWsClient) ConnectWebSocket() {
55
var err error
56
applogger.Info("WebSocket connecting...")
57
p.WebSocketClient, _, err = websocket.DefaultDialer.Dial(config.WsUrl, nil)
58
if err != nil {
59
fmt.Printf("WebSocket connected error: %s\n", err)
60
return
61
}
62
applogger.Info("WebSocket connected")
63
p.Connection = true
64
}
65
66
func (p *BitgetBaseWsClient) Login() {
67
timesStamp := internal.TimesStampSec()
68
sign := p.Signer.Sign(constants.WsAuthMethod, constants.WsAuthPath, "", timesStamp)
69
if constants.RSA == config.SignType {
70
sign = p.Signer.SignByRSA(constants.WsAuthMethod, constants.WsAuthPath, "", timesStamp)
71
}
72
73
loginReq := model.WsLoginReq{
74
ApiKey: config.ApiKey,
75
Passphrase: config.PASSPHRASE,
76
Timestamp: timesStamp,
77
Sign: sign,
78
}
79
var args []interface{}
80
args = append(args, loginReq)
81
82
baseReq := model.WsBaseReq{
83
Op: constants.WsOpLogin,
84
Args: args,
85
}
86
p.SendByType(baseReq)
87
}
88
89
func (p *BitgetBaseWsClient) StartReadLoop() {
90
go p.ReadLoop()
91
}
92
93
func (p *BitgetBaseWsClient) ExecuterPing() {
94
c := cron.New()
95
_ = c.AddFunc("*/15 * * * * *", p.ping)
96
c.Start()
97
}
98
func (p *BitgetBaseWsClient) ping() {
99
p.Send("ping")
100
}
101
102
func (p *BitgetBaseWsClient) SendByType(req model.WsBaseReq) {
103
json, _ := internal.ToJson(req)
104
p.Send(json)
105
}
106
107
func (p *BitgetBaseWsClient) Send(data string) {
108
if p.WebSocketClient == nil {
109
applogger.Error("WebSocket sent error: no connection available")
110
return
111
}
112
applogger.Info("sendMessage:%s", data)
113
p.SendMutex.Lock()
114
err := p.WebSocketClient.WriteMessage(websocket.TextMessage, []byte(data))
115
p.SendMutex.Unlock()
116
if err != nil {
117
applogger.Error("WebSocket sent error: data=%s, error=%s", data, err)
118
}
119
}
120
121
func (p *BitgetBaseWsClient) tickerLoop() {
122
applogger.Info("tickerLoop started")
123
for {
124
select {
125
case <-p.Ticker.C:
126
elapsedSecond := time.Now().Sub(p.LastReceivedTime).Seconds()
127
128
if elapsedSecond > constants.ReconnectWaitSecond {
129
applogger.Info("WebSocket reconnect...")
130
p.disconnectWebSocket()
131
p.ConnectWebSocket()
132
}
133
}
134
}
135
}
136
137
func (p *BitgetBaseWsClient) disconnectWebSocket() {
138
if p.WebSocketClient == nil {
139
return
140
}
141
142
fmt.Println("WebSocket disconnecting...")
143
err := p.WebSocketClient.Close()
144
if err != nil {
145
applogger.Error("WebSocket disconnect error: %s\n", err)
146
return
147
}
148
149
applogger.Info("WebSocket disconnected")
150
}
151
152
func (p *BitgetBaseWsClient) ReadLoop() {
153
for {
154
155
if p.WebSocketClient == nil {
156
applogger.Info("Read error: no connection available")
157
//time.Sleep(TimerIntervalSecond * time.Second)
158
continue
159
}
160
161
_, buf, err := p.WebSocketClient.ReadMessage()
162
if err != nil {
163
applogger.Info("Read error: %s", err)
164
continue
165
}
166
p.LastReceivedTime = time.Now()
167
message := string(buf)
168
169
applogger.Info("rev:" + message)
170
171
if message == "pong" {
172
applogger.Info("Keep connected:" + message)
173
continue
174
}
175
jsonMap := internal.JSONToMap(message)
176
177
v, e := jsonMap["code"]
178
179
if e && int(v.(float64)) != 0 {
180
p.ErrorListener(message)
181
continue
182
}
183
184
v, e = jsonMap["event"]
185
if e && v == "login" {
186
applogger.Info("login msg:" + message)
187
p.LoginStatus = true
188
continue
189
}
190
191
v, e = jsonMap["data"]
192
if e {
193
listener := p.GetListener(jsonMap["arg"])
194
listener(message)
195
continue
196
}
197
p.handleMessage(message)
198
}
199
200
}
201
202
func (p *BitgetBaseWsClient) GetListener(argJson interface{}) OnReceive {
203
204
mapData := argJson.(map[string]interface{})
205
206
subscribeReq := model.SubscribeReq{
207
InstType: fmt.Sprintf("%v", mapData["instType"]),
208
Channel: fmt.Sprintf("%v", mapData["channel"]),
209
InstId: fmt.Sprintf("%v", mapData["instId"]),
210
}
211
212
v, e := p.ScribeMap[subscribeReq]
213
214
if !e {
215
return p.Listener
216
}
217
return v
218
}
219
220
type OnReceive func(message string)
221
222
func (p *BitgetBaseWsClient) handleMessage(msg string) {
223
fmt.Println("default:" + msg)
224
}
225
226