Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
angel-one
GitHub Repository: angel-one/smartapi-python
Path: blob/main/SmartApi/webSocket.py
410 views
1
2
import six
3
import sys
4
import time
5
import json
6
import struct
7
import logging
8
import threading
9
import base64
10
import zlib
11
from datetime import datetime
12
from twisted.internet import reactor, ssl
13
from twisted.python import log as twisted_log
14
from twisted.internet.protocol import ReconnectingClientFactory
15
from autobahn.twisted.websocket import WebSocketClientProtocol, \
16
WebSocketClientFactory, connectWS
17
18
log = logging.getLogger(__name__)
19
20
class SmartSocketClientProtocol(WebSocketClientProtocol):
21
22
def __init__(self, *args, **kwargs):
23
super(SmartSocketClientProtocol,self).__init__(*args,**kwargs)
24
25
def onConnect(self, response): # noqa
26
"""Called when WebSocket server connection was established"""
27
self.factory.ws = self
28
29
if self.factory.on_connect:
30
self.factory.on_connect(self, response)
31
32
def onOpen(self):
33
if self.factory.on_open:
34
self.factory.on_open(self)
35
36
37
38
def onMessage(self, payload, is_binary): # noqa
39
"""Called when text or binary message is received."""
40
if self.factory.on_message:
41
self.factory.on_message(self, payload, is_binary)
42
43
44
def onClose(self, was_clean, code, reason): # noqa
45
"""Called when connection is closed."""
46
if not was_clean:
47
if self.factory.on_error:
48
self.factory.on_error(self, code, reason)
49
50
if self.factory.on_close:
51
self.factory.on_close(self, code, reason)
52
53
54
class SmartSocketClientFactory(WebSocketClientFactory,ReconnectingClientFactory):
55
protocol = SmartSocketClientProtocol
56
57
maxDelay = 5
58
maxRetries = 10
59
60
_last_connection_time = None
61
62
def __init__(self, *args, **kwargs):
63
"""Initialize with default callback method values."""
64
self.debug = False
65
self.ws = None
66
self.on_open = None
67
self.on_error = None
68
self.on_close = None
69
self.on_message = None
70
self.on_connect = None
71
self.on_reconnect = None
72
self.on_noreconnect = None
73
74
75
super(SmartSocketClientFactory, self).__init__(*args, **kwargs)
76
77
def startedConnecting(self, connector): # noqa
78
"""On connecting start or reconnection."""
79
if not self._last_connection_time and self.debug:
80
log.debug("Start WebSocket connection.")
81
82
self._last_connection_time = time.time()
83
84
def clientConnectionFailed(self, connector, reason): # noqa
85
"""On connection failure (When connect request fails)"""
86
if self.retries > 0:
87
print("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay))))
88
89
# on reconnect callback
90
if self.on_reconnect:
91
self.on_reconnect(self.retries)
92
93
# Retry the connection
94
self.retry(connector)
95
self.send_noreconnect()
96
97
def clientConnectionLost(self, connector, reason): # noqa
98
"""On connection lost (When ongoing connection got disconnected)."""
99
if self.retries > 0:
100
# on reconnect callback
101
if self.on_reconnect:
102
self.on_reconnect(self.retries)
103
104
# Retry the connection
105
self.retry(connector)
106
self.send_noreconnect()
107
108
def send_noreconnect(self):
109
"""Callback `no_reconnect` if max retries are exhausted."""
110
if self.maxRetries is not None and (self.retries > self.maxRetries):
111
if self.debug:
112
log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries))
113
114
if self.on_noreconnect:
115
self.on_noreconnect()
116
117
class WebSocket(object):
118
EXCHANGE_MAP = {
119
"nse": 1,
120
"nfo": 2,
121
"cds": 3,
122
"bse": 4,
123
"bfo": 5,
124
"bsecds": 6,
125
"mcx": 7,
126
"mcxsx": 8,
127
"indices": 9
128
}
129
# Default connection timeout
130
CONNECT_TIMEOUT = 30
131
# Default Reconnect max delay.
132
RECONNECT_MAX_DELAY = 60
133
# Default reconnect attempts
134
RECONNECT_MAX_TRIES = 50
135
136
ROOT_URI='wss://wsfeeds.angelbroking.com/NestHtml5Mobile/socket/stream'
137
138
# Flag to set if its first connect
139
_is_first_connect = True
140
141
# Minimum delay which should be set between retries. User can't set less than this
142
_minimum_reconnect_max_delay = 5
143
# Maximum number or retries user can set
144
_maximum_reconnect_max_tries = 300
145
146
feed_token=None
147
client_code=None
148
def __init__(self, FEED_TOKEN, CLIENT_CODE,debug=False, root=None,reconnect=True,reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,connect_timeout=CONNECT_TIMEOUT):
149
150
151
self.root = root or self.ROOT_URI
152
self.feed_token= FEED_TOKEN
153
self.client_code= CLIENT_CODE
154
155
# Set max reconnect tries
156
if reconnect_max_tries > self._maximum_reconnect_max_tries:
157
log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format(
158
val=self._maximum_reconnect_max_tries))
159
self.reconnect_max_tries = self._maximum_reconnect_max_tries
160
else:
161
self.reconnect_max_tries = reconnect_max_tries
162
163
# Set max reconnect delay
164
if reconnect_max_delay < self._minimum_reconnect_max_delay:
165
log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format(
166
val=self._minimum_reconnect_max_delay))
167
self.reconnect_max_delay = self._minimum_reconnect_max_delay
168
else:
169
self.reconnect_max_delay = reconnect_max_delay
170
171
self.connect_timeout = connect_timeout
172
173
# Debug enables logs
174
self.debug = debug
175
176
# Placeholders for callbacks.
177
self.on_ticks = None
178
self.on_open = None
179
self.on_close = None
180
self.on_error = None
181
self.on_connect = None
182
self.on_message = None
183
self.on_reconnect = None
184
self.on_noreconnect = None
185
186
187
def _create_connection(self, url, **kwargs):
188
"""Create a WebSocket client connection."""
189
self.factory = SmartSocketClientFactory(url, **kwargs)
190
191
# Alias for current websocket connection
192
self.ws = self.factory.ws
193
194
self.factory.debug = self.debug
195
196
# Register private callbacks
197
self.factory.on_open = self._on_open
198
self.factory.on_error = self._on_error
199
self.factory.on_close = self._on_close
200
self.factory.on_message = self._on_message
201
self.factory.on_connect = self._on_connect
202
self.factory.on_reconnect = self._on_reconnect
203
self.factory.on_noreconnect = self._on_noreconnect
204
205
206
self.factory.maxDelay = self.reconnect_max_delay
207
self.factory.maxRetries = self.reconnect_max_tries
208
209
def connect(self, threaded=False, disable_ssl_verification=False, proxy=None):
210
#print("Connect")
211
self._create_connection(self.ROOT_URI)
212
213
context_factory = None
214
#print(self.factory.isSecure,disable_ssl_verification)
215
if self.factory.isSecure and not disable_ssl_verification:
216
context_factory = ssl.ClientContextFactory()
217
#print("context_factory",context_factory)
218
connectWS(self.factory, contextFactory=context_factory, timeout=30)
219
220
# Run in seperate thread of blocking
221
opts = {}
222
223
# Run when reactor is not running
224
if not reactor.running:
225
if threaded:
226
#print("inside threaded")
227
# Signals are not allowed in non main thread by twisted so suppress it.
228
opts["installSignalHandlers"] = False
229
self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts)
230
self.websocket_thread.daemon = True
231
self.websocket_thread.start()
232
else:
233
reactor.run(**opts)
234
235
236
def is_connected(self):
237
#print("Check if WebSocket connection is established.")
238
if self.ws and self.ws.state == self.ws.STATE_OPEN:
239
return True
240
else:
241
return False
242
243
def _close(self, code=None, reason=None):
244
#print("Close the WebSocket connection.")
245
if self.ws:
246
self.ws.sendClose(code, reason)
247
248
def close(self, code=None, reason=None):
249
"""Close the WebSocket connection."""
250
self.stop_retry()
251
self._close(code, reason)
252
253
def stop(self):
254
"""Stop the event loop. Should be used if main thread has to be closed in `on_close` method."""
255
#print("stop")
256
257
reactor.stop()
258
259
def stop_retry(self):
260
"""Stop auto retry when it is in progress."""
261
if self.factory:
262
self.factory.stopTrying()
263
264
def _on_reconnect(self, attempts_count):
265
if self.on_reconnect:
266
return self.on_reconnect(self, attempts_count)
267
268
def _on_noreconnect(self):
269
if self.on_noreconnect:
270
return self.on_noreconnect(self)
271
272
def websocket_connection(self):
273
if self.client_code == None or self.feed_token == None:
274
return "client_code or feed_token or task is missing"
275
276
request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code}
277
self.ws.sendMessage(
278
six.b(json.dumps(request))
279
)
280
#print(request)
281
282
threading.Thread(target=self.heartBeat,daemon=True).start()
283
284
def send_request(self,token,task):
285
if task in ("mw","sfi","dp"):
286
strwatchlistscrips = token #dynamic call
287
288
try:
289
request={"task":task,"channel":strwatchlistscrips,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code}
290
291
self.ws.sendMessage(
292
six.b(json.dumps(request))
293
)
294
return True
295
except Exception as e:
296
self._close(reason="Error while request sending: {}".format(str(e)))
297
raise
298
else:
299
print("The task entered is invalid, Please enter correct task(mw,sfi,dp) ")
300
301
def _on_connect(self, ws, response):
302
#print("-----_on_connect-------")
303
self.ws = ws
304
if self.on_connect:
305
306
print(self.on_connect)
307
self.on_connect(self, response)
308
#self.websocket_connection
309
310
def _on_close(self, ws, code, reason):
311
"""Call `on_close` callback when connection is closed."""
312
log.debug("Connection closed: {} - {}".format(code, str(reason)))
313
314
if self.on_close:
315
self.on_close(self, code, reason)
316
317
def _on_error(self, ws, code, reason):
318
"""Call `on_error` callback when connection throws an error."""
319
log.debug("Connection error: {} - {}".format(code, str(reason)))
320
321
if self.on_error:
322
self.on_error(self, code, reason)
323
324
325
326
def _on_message(self, ws, payload, is_binary):
327
"""Call `on_message` callback when text message is received."""
328
if self.on_message:
329
self.on_message(self, payload, is_binary)
330
331
# If the message is binary, parse it and send it to the callback.
332
if self.on_ticks and is_binary and len(payload) > 4:
333
self.on_ticks(self, self._parse_binary(payload))
334
335
# Parse text messages
336
if not is_binary:
337
self._parse_text_message(payload)
338
339
def _on_open(self, ws):
340
if not self._is_first_connect:
341
self.connect()
342
343
self._is_first_connect = False
344
345
if self.on_open:
346
return self.on_open(self)
347
348
349
def heartBeat(self):
350
while True:
351
try:
352
request={"task":"hb","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code}
353
self.ws.sendMessage(
354
six.b(json.dumps(request))
355
)
356
357
except:
358
print("HeartBeats Failed")
359
time.sleep(60)
360
361
362
def _parse_text_message(self, payload):
363
"""Parse text message."""
364
# Decode unicode data
365
if not six.PY2 and type(payload) == bytes:
366
payload = payload.decode("utf-8")
367
368
data =base64.b64decode(payload)
369
370
try:
371
data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8')
372
data = json.loads(data.decode('utf8').replace("'", '"'))
373
data = json.loads(json.dumps(data, indent=4, sort_keys=True))
374
except ValueError:
375
return
376
377
self.on_ticks(self, data)
378
379
def _parse_binary(self, bin):
380
"""Parse binary data to a (list of) ticks structure."""
381
packets = self._split_packets(bin) # split data to individual ticks packet
382
data = []
383
384
for packet in packets:
385
instrument_token = self._unpack_int(packet, 0, 4)
386
segment = instrument_token & 0xff # Retrive segment constant from instrument_token
387
388
divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0
389
390
# All indices are not tradable
391
tradable = False if segment == self.EXCHANGE_MAP["indices"] else True
392
try:
393
last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48))
394
except Exception:
395
last_trade_time = None
396
397
try:
398
timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64))
399
except Exception:
400
timestamp = None
401
402
d["last_trade_time"] = last_trade_time
403
d["oi"] = self._unpack_int(packet, 48, 52)
404
d["oi_day_high"] = self._unpack_int(packet, 52, 56)
405
d["oi_day_low"] = self._unpack_int(packet, 56, 60)
406
d["timestamp"] = timestamp
407
408
# Market depth entries.
409
depth = {
410
"buy": [],
411
"sell": []
412
}
413
414
# Compile the market depth lists.
415
for i, p in enumerate(range(64, len(packet), 12)):
416
depth["sell" if i >= 5 else "buy"].append({
417
"quantity": self._unpack_int(packet, p, p + 4),
418
"price": self._unpack_int(packet, p + 4, p + 8) / divisor,
419
"orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H")
420
})
421
422
d["depth"] = depth
423
424
data.append(d)
425
426
return data
427
428
def _unpack_int(self, bin, start, end, byte_format="I"):
429
"""Unpack binary data as unsgined interger."""
430
return struct.unpack(">" + byte_format, bin[start:end])[0]
431
432
def _split_packets(self, bin):
433
"""Split the data to individual packets of ticks."""
434
# Ignore heartbeat data.
435
if len(bin) < 2:
436
return []
437
438
number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H")
439
packets = []
440
441
j = 2
442
for i in range(number_of_packets):
443
packet_length = self._unpack_int(bin, j, j + 2, byte_format="H")
444
packets.append(bin[j + 2: j + 2 + packet_length])
445
j = j + 2 + packet_length
446
447
return packets
448
449