Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
angel-one
GitHub Repository: angel-one/smartapi-python
Path: blob/main/SmartApi/smartWebSocketV2.py
410 views
1
import struct
2
import time
3
import ssl
4
import json
5
import websocket
6
import os
7
import logging
8
import logzero
9
from logzero import logger
10
11
class SmartWebSocketV2(object):
12
"""
13
SmartAPI Web Socket version 2
14
"""
15
16
ROOT_URI = "wss://smartapisocket.angelone.in/smart-stream"
17
HEART_BEAT_MESSAGE = "ping"
18
HEART_BEAT_INTERVAL = 10 # Adjusted to 10s
19
LITTLE_ENDIAN_BYTE_ORDER = "<"
20
RESUBSCRIBE_FLAG = False
21
# HB_THREAD_FLAG = True
22
23
# Available Actions
24
SUBSCRIBE_ACTION = 1
25
UNSUBSCRIBE_ACTION = 0
26
27
# Possible Subscription Mode
28
LTP_MODE = 1
29
QUOTE = 2
30
SNAP_QUOTE = 3
31
DEPTH = 4
32
33
# Exchange Type
34
NSE_CM = 1
35
NSE_FO = 2
36
BSE_CM = 3
37
BSE_FO = 4
38
MCX_FO = 5
39
NCX_FO = 7
40
CDE_FO = 13
41
42
# Subscription Mode Map
43
SUBSCRIPTION_MODE_MAP = {
44
1: "LTP",
45
2: "QUOTE",
46
3: "SNAP_QUOTE",
47
4: "DEPTH"
48
}
49
50
wsapp = None
51
input_request_dict = {}
52
current_retry_attempt = 0
53
54
def __init__(self, auth_token, api_key, client_code, feed_token, max_retry_attempt=1,retry_strategy=0, retry_delay=10, retry_multiplier=2, retry_duration=60):
55
"""
56
Initialise the SmartWebSocketV2 instance
57
Parameters
58
------
59
auth_token: string
60
jwt auth token received from Login API
61
api_key: string
62
api key from Smart API account
63
client_code: string
64
angel one account id
65
feed_token: string
66
feed token received from Login API
67
"""
68
self.auth_token = auth_token
69
self.api_key = api_key
70
self.client_code = client_code
71
self.feed_token = feed_token
72
self.DISCONNECT_FLAG = True
73
self.last_pong_timestamp = None
74
self.MAX_RETRY_ATTEMPT = max_retry_attempt
75
self.retry_strategy = retry_strategy
76
self.retry_delay = retry_delay
77
self.retry_multiplier = retry_multiplier
78
self.retry_duration = retry_duration
79
# Create a log folder based on the current date
80
log_folder = time.strftime("%Y-%m-%d", time.localtime())
81
log_folder_path = os.path.join("logs", log_folder) # Construct the full path to the log folder
82
os.makedirs(log_folder_path, exist_ok=True) # Create the log folder if it doesn't exist
83
log_path = os.path.join(log_folder_path, "app.log") # Construct the full path to the log file
84
logzero.logfile(log_path, loglevel=logging.INFO) # Output logs to a date-wise log file
85
86
if not self._sanity_check():
87
logger.error("Invalid initialization parameters. Provide valid values for all the tokens.")
88
raise Exception("Provide valid value for all the tokens")
89
90
def _sanity_check(self):
91
if not all([self.auth_token, self.api_key, self.client_code, self.feed_token]):
92
return False
93
return True
94
95
def _on_message(self, wsapp, message):
96
logger.info(f"Received message: {message}")
97
if message != "pong":
98
parsed_message = self._parse_binary_data(message)
99
# Check if it's a control message (e.g., heartbeat)
100
if self._is_control_message(parsed_message):
101
self._handle_control_message(parsed_message)
102
else:
103
self.on_data(wsapp, parsed_message)
104
else:
105
self.on_message(wsapp, message)
106
107
def _is_control_message(self, parsed_message):
108
return "subscription_mode" not in parsed_message
109
110
def _handle_control_message(self, parsed_message):
111
if parsed_message["subscription_mode"] == 0:
112
self._on_pong(self.wsapp, "pong")
113
elif parsed_message["subscription_mode"] == 1:
114
self._on_ping(self.wsapp, "ping")
115
# Invoke on_control_message callback with the control message data
116
if hasattr(self, 'on_control_message'):
117
self.on_control_message(self.wsapp, parsed_message)
118
119
def _on_data(self, wsapp, data, data_type, continue_flag):
120
if data_type == 2:
121
parsed_message = self._parse_binary_data(data)
122
self.on_data(wsapp, parsed_message)
123
124
def _on_open(self, wsapp):
125
if self.RESUBSCRIBE_FLAG:
126
self.resubscribe()
127
else:
128
self.on_open(wsapp)
129
130
def _on_pong(self, wsapp, data):
131
if data == self.HEART_BEAT_MESSAGE:
132
timestamp = time.time()
133
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
134
logger.info(f"In on pong function ==> {data}, Timestamp: {formatted_timestamp}")
135
self.last_pong_timestamp = timestamp
136
137
def _on_ping(self, wsapp, data):
138
timestamp = time.time()
139
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
140
logger.info(f"In on ping function ==> {data}, Timestamp: {formatted_timestamp}")
141
self.last_ping_timestamp = timestamp
142
143
def subscribe(self, correlation_id, mode, token_list):
144
"""
145
This Function subscribe the price data for the given token
146
Parameters
147
------
148
correlation_id: string
149
A 10 character alphanumeric ID client may provide which will be returned by the server in error response
150
to indicate which request generated error response.
151
Clients can use this optional ID for tracking purposes between request and corresponding error response.
152
mode: integer
153
It denotes the subscription type
154
possible values -> 1, 2 and 3
155
1 -> LTP
156
2 -> Quote
157
3 -> Snap Quote
158
token_list: list of dict
159
Sample Value ->
160
[
161
{ "exchangeType": 1, "tokens": ["10626", "5290"]},
162
{"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
163
]
164
exchangeType: integer
165
possible values ->
166
1 -> nse_cm
167
2 -> nse_fo
168
3 -> bse_cm
169
4 -> bse_fo
170
5 -> mcx_fo
171
7 -> ncx_fo
172
13 -> cde_fo
173
tokens: list of string
174
"""
175
try:
176
request_data = {
177
"correlationID": correlation_id,
178
"action": self.SUBSCRIBE_ACTION,
179
"params": {
180
"mode": mode,
181
"tokenList": token_list
182
}
183
}
184
if mode == 4:
185
for token in token_list:
186
if token.get('exchangeType') != 1:
187
error_message = f"Invalid ExchangeType:{token.get('exchangeType')} Please check the exchange type and try again it support only 1 exchange type"
188
logger.error(error_message)
189
raise ValueError(error_message)
190
191
if self.input_request_dict.get(mode) is None:
192
self.input_request_dict[mode] = {}
193
194
for token in token_list:
195
if token['exchangeType'] in self.input_request_dict[mode]:
196
self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"])
197
else:
198
self.input_request_dict[mode][token['exchangeType']] = token["tokens"]
199
200
if mode == self.DEPTH:
201
total_tokens = sum(len(token["tokens"]) for token in token_list)
202
quota_limit = 50
203
if total_tokens > quota_limit:
204
error_message = f"Quota exceeded: You can subscribe to a maximum of {quota_limit} tokens only."
205
logger.error(error_message)
206
raise Exception(error_message)
207
208
self.wsapp.send(json.dumps(request_data))
209
self.RESUBSCRIBE_FLAG = True
210
211
except Exception as e:
212
logger.error(f"Error occurred during subscribe: {e}")
213
raise e
214
215
def unsubscribe(self, correlation_id, mode, token_list):
216
"""
217
This function unsubscribe the data for given token
218
Parameters
219
------
220
correlation_id: string
221
A 10 character alphanumeric ID client may provide which will be returned by the server in error response
222
to indicate which request generated error response.
223
Clients can use this optional ID for tracking purposes between request and corresponding error response.
224
mode: integer
225
It denotes the subscription type
226
possible values -> 1, 2 and 3
227
1 -> LTP
228
2 -> Quote
229
3 -> Snap Quote
230
token_list: list of dict
231
Sample Value ->
232
[
233
{ "exchangeType": 1, "tokens": ["10626", "5290"]},
234
{"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
235
]
236
exchangeType: integer
237
possible values ->
238
1 -> nse_cm
239
2 -> nse_fo
240
3 -> bse_cm
241
4 -> bse_fo
242
5 -> mcx_fo
243
7 -> ncx_fo
244
13 -> cde_fo
245
tokens: list of string
246
"""
247
try:
248
request_data = {
249
"correlationID": correlation_id,
250
"action": self.UNSUBSCRIBE_ACTION,
251
"params": {
252
"mode": mode,
253
"tokenList": token_list
254
}
255
}
256
self.input_request_dict.update(request_data)
257
self.wsapp.send(json.dumps(request_data))
258
self.RESUBSCRIBE_FLAG = True
259
except Exception as e:
260
logger.error(f"Error occurred during unsubscribe: {e}")
261
raise e
262
263
def resubscribe(self):
264
try:
265
for key, val in self.input_request_dict.items():
266
token_list = []
267
for key1, val1 in val.items():
268
temp_data = {
269
'exchangeType': key1,
270
'tokens': val1
271
}
272
token_list.append(temp_data)
273
request_data = {
274
"action": self.SUBSCRIBE_ACTION,
275
"params": {
276
"mode": key,
277
"tokenList": token_list
278
}
279
}
280
self.wsapp.send(json.dumps(request_data))
281
except Exception as e:
282
logger.error(f"Error occurred during resubscribe: {e}")
283
raise e
284
285
def connect(self):
286
"""
287
Make the web socket connection with the server
288
"""
289
headers = {
290
"Authorization": self.auth_token,
291
"x-api-key": self.api_key,
292
"x-client-code": self.client_code,
293
"x-feed-token": self.feed_token
294
}
295
296
try:
297
self.wsapp = websocket.WebSocketApp(self.ROOT_URI, header=headers, on_open=self._on_open,
298
on_error=self._on_error, on_close=self._on_close, on_data=self._on_data,
299
on_ping=self._on_ping,
300
on_pong=self._on_pong)
301
self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEART_BEAT_INTERVAL)
302
except Exception as e:
303
logger.error(f"Error occurred during WebSocket connection: {e}")
304
raise e
305
306
def close_connection(self):
307
"""
308
Closes the connection
309
"""
310
self.RESUBSCRIBE_FLAG = False
311
self.DISCONNECT_FLAG = True
312
if self.wsapp:
313
self.wsapp.close()
314
315
def _on_error(self, wsapp, error):
316
self.RESUBSCRIBE_FLAG = True
317
if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT:
318
logger.warning(f"Attempting to resubscribe/reconnect (Attempt {self.current_retry_attempt + 1})...")
319
self.current_retry_attempt += 1
320
if self.retry_strategy == 0: #retry_strategy for simple
321
time.sleep(self.retry_delay)
322
elif self.retry_strategy == 1: #retry_strategy for exponential
323
delay = self.retry_delay * (self.retry_multiplier ** (self.current_retry_attempt - 1))
324
time.sleep(delay)
325
else:
326
logger.error(f"Invalid retry strategy {self.retry_strategy}")
327
raise Exception(f"Invalid retry strategy {self.retry_strategy}")
328
try:
329
self.close_connection()
330
self.connect()
331
except Exception as e:
332
logger.error(f"Error occurred during resubscribe/reconnect: {e}")
333
if hasattr(self, 'on_error'):
334
self.on_error("Reconnect Error", str(e) if str(e) else "Unknown error")
335
else:
336
self.close_connection()
337
if hasattr(self, 'on_error'):
338
self.on_error("Max retry attempt reached", "Connection closed")
339
if self.retry_duration is not None and (self.last_pong_timestamp is not None and time.time() - self.last_pong_timestamp > self.retry_duration * 60):
340
logger.warning("Connection closed due to inactivity.")
341
else:
342
logger.warning("Connection closed due to max retry attempts reached.")
343
344
def _on_close(self, wsapp):
345
self.on_close(wsapp)
346
347
def _parse_binary_data(self, binary_data):
348
parsed_data = {
349
"subscription_mode": self._unpack_data(binary_data, 0, 1, byte_format="B")[0],
350
"exchange_type": self._unpack_data(binary_data, 1, 2, byte_format="B")[0],
351
"token": SmartWebSocketV2._parse_token_value(binary_data[2:27]),
352
"sequence_number": self._unpack_data(binary_data, 27, 35, byte_format="q")[0],
353
"exchange_timestamp": self._unpack_data(binary_data, 35, 43, byte_format="q")[0],
354
"last_traded_price": self._unpack_data(binary_data, 43, 51, byte_format="q")[0]
355
}
356
try:
357
parsed_data["subscription_mode_val"] = self.SUBSCRIPTION_MODE_MAP.get(parsed_data["subscription_mode"])
358
359
if parsed_data["subscription_mode"] in [self.QUOTE, self.SNAP_QUOTE]:
360
parsed_data["last_traded_quantity"] = self._unpack_data(binary_data, 51, 59, byte_format="q")[0]
361
parsed_data["average_traded_price"] = self._unpack_data(binary_data, 59, 67, byte_format="q")[0]
362
parsed_data["volume_trade_for_the_day"] = self._unpack_data(binary_data, 67, 75, byte_format="q")[0]
363
parsed_data["total_buy_quantity"] = self._unpack_data(binary_data, 75, 83, byte_format="d")[0]
364
parsed_data["total_sell_quantity"] = self._unpack_data(binary_data, 83, 91, byte_format="d")[0]
365
parsed_data["open_price_of_the_day"] = self._unpack_data(binary_data, 91, 99, byte_format="q")[0]
366
parsed_data["high_price_of_the_day"] = self._unpack_data(binary_data, 99, 107, byte_format="q")[0]
367
parsed_data["low_price_of_the_day"] = self._unpack_data(binary_data, 107, 115, byte_format="q")[0]
368
parsed_data["closed_price"] = self._unpack_data(binary_data, 115, 123, byte_format="q")[0]
369
370
if parsed_data["subscription_mode"] == self.SNAP_QUOTE:
371
parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0]
372
parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0]
373
parsed_data["open_interest_change_percentage"] = self._unpack_data(binary_data, 139, 147, byte_format="q")[0]
374
parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0]
375
parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0]
376
parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0]
377
parsed_data["52_week_low_price"] = self._unpack_data(binary_data, 371, 379, byte_format="q")[0]
378
best_5_buy_and_sell_data = self._parse_best_5_buy_and_sell_data(binary_data[147:347])
379
parsed_data["best_5_buy_data"] = best_5_buy_and_sell_data["best_5_sell_data"]
380
parsed_data["best_5_sell_data"] = best_5_buy_and_sell_data["best_5_buy_data"]
381
382
if parsed_data["subscription_mode"] == self.DEPTH:
383
parsed_data.pop("sequence_number", None)
384
parsed_data.pop("last_traded_price", None)
385
parsed_data.pop("subscription_mode_val", None)
386
parsed_data["packet_received_time"]=self._unpack_data(binary_data, 35, 43, byte_format="q")[0]
387
depth_data_start_index = 43
388
depth_20_data = self._parse_depth_20_buy_and_sell_data(binary_data[depth_data_start_index:])
389
parsed_data["depth_20_buy_data"] = depth_20_data["depth_20_buy_data"]
390
parsed_data["depth_20_sell_data"] = depth_20_data["depth_20_sell_data"]
391
392
return parsed_data
393
except Exception as e:
394
logger.error(f"Error occurred during binary data parsing: {e}")
395
raise e
396
397
def _unpack_data(self, binary_data, start, end, byte_format="I"):
398
"""
399
Unpack Binary Data to the integer according to the specified byte_format.
400
This function returns the tuple
401
"""
402
return struct.unpack(self.LITTLE_ENDIAN_BYTE_ORDER + byte_format, binary_data[start:end])
403
404
@staticmethod
405
def _parse_token_value(binary_packet):
406
token = ""
407
for i in range(len(binary_packet)):
408
if chr(binary_packet[i]) == '\x00':
409
return token
410
token += chr(binary_packet[i])
411
return token
412
413
def _parse_best_5_buy_and_sell_data(self, binary_data):
414
415
def split_packets(binary_packets):
416
packets = []
417
418
i = 0
419
while i < len(binary_packets):
420
packets.append(binary_packets[i: i + 20])
421
i += 20
422
return packets
423
424
best_5_buy_sell_packets = split_packets(binary_data)
425
426
best_5_buy_data = []
427
best_5_sell_data = []
428
429
for packet in best_5_buy_sell_packets:
430
each_data = {
431
"flag": self._unpack_data(packet, 0, 2, byte_format="H")[0],
432
"quantity": self._unpack_data(packet, 2, 10, byte_format="q")[0],
433
"price": self._unpack_data(packet, 10, 18, byte_format="q")[0],
434
"no of orders": self._unpack_data(packet, 18, 20, byte_format="H")[0]
435
}
436
437
if each_data["flag"] == 0:
438
best_5_buy_data.append(each_data)
439
else:
440
best_5_sell_data.append(each_data)
441
442
return {
443
"best_5_buy_data": best_5_buy_data,
444
"best_5_sell_data": best_5_sell_data
445
}
446
447
def _parse_depth_20_buy_and_sell_data(self, binary_data):
448
depth_20_buy_data = []
449
depth_20_sell_data = []
450
451
for i in range(20):
452
buy_start_idx = i * 10
453
sell_start_idx = 200 + i * 10
454
455
# Parse buy data
456
buy_packet_data = {
457
"quantity": self._unpack_data(binary_data, buy_start_idx, buy_start_idx + 4, byte_format="i")[0],
458
"price": self._unpack_data(binary_data, buy_start_idx + 4, buy_start_idx + 8, byte_format="i")[0],
459
"num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[0],
460
}
461
462
# Parse sell data
463
sell_packet_data = {
464
"quantity": self._unpack_data(binary_data, sell_start_idx, sell_start_idx + 4, byte_format="i")[0],
465
"price": self._unpack_data(binary_data, sell_start_idx + 4, sell_start_idx + 8, byte_format="i")[0],
466
"num_of_orders": self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0],
467
}
468
469
depth_20_buy_data.append(buy_packet_data)
470
depth_20_sell_data.append(sell_packet_data)
471
472
return {
473
"depth_20_buy_data": depth_20_buy_data,
474
"depth_20_sell_data": depth_20_sell_data
475
}
476
477
def on_message(self, wsapp, message):
478
pass
479
480
def on_data(self, wsapp, data):
481
pass
482
483
def on_control_message(self, wsapp, message):
484
pass
485
486
def on_close(self, wsapp):
487
pass
488
489
def on_open(self, wsapp):
490
pass
491
492
def on_error(self):
493
pass
494
495