Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
angel-one
GitHub Repository: angel-one/smartapi-python
Path: blob/main/SmartApi/smartApiWebsocket.py
410 views
1
# -*- coding: utf-8 -*-
2
"""
3
Created on Fri Apr 23 11:38:36 2021
4
5
@author: Sandip.Khairnar
6
"""
7
8
import websocket
9
import six
10
import base64
11
import zlib
12
import datetime
13
import time
14
import json
15
import threading
16
import ssl
17
18
class SmartWebSocket(object):
19
ROOT_URI='wss://wsfeeds.angelbroking.com/NestHtml5Mobile/socket/stream'
20
HB_INTERVAL=30
21
HB_THREAD_FLAG=False
22
WS_RECONNECT_FLAG=False
23
feed_token=None
24
client_code=None
25
ws=None
26
task_dict = {}
27
28
def __init__(self, FEED_TOKEN, CLIENT_CODE):
29
self.root = self.ROOT_URI
30
self.feed_token = FEED_TOKEN
31
self.client_code = CLIENT_CODE
32
if self.client_code == None or self.feed_token == None:
33
return "client_code or feed_token or task is missing"
34
35
def _subscribe_on_open(self):
36
request = {"task": "cn", "channel": "NONLM", "token": self.feed_token, "user": self.client_code,
37
"acctid": self.client_code}
38
print(request)
39
self.ws.send(
40
six.b(json.dumps(request))
41
)
42
43
thread = threading.Thread(target=self.run, args=())
44
thread.daemon = True
45
thread.start()
46
47
def run(self):
48
while True:
49
# More statements comes here
50
if self.HB_THREAD_FLAG:
51
break
52
print(datetime.datetime.now().__str__() + ' : Start task in the background')
53
54
self.heartBeat()
55
56
time.sleep(self.HB_INTERVAL)
57
58
def subscribe(self, task, token):
59
# print(self.task_dict)
60
self.task_dict.update([(task,token),])
61
# print(self.task_dict)
62
if task in ("mw", "sfi", "dp"):
63
strwatchlistscrips = token # dynamic call
64
65
try:
66
request = {"task": task, "channel": strwatchlistscrips, "token": self.feed_token,
67
"user": self.client_code, "acctid": self.client_code}
68
69
self.ws.send(
70
six.b(json.dumps(request))
71
)
72
return True
73
except Exception as e:
74
self._close(reason="Error while request sending: {}".format(str(e)))
75
raise
76
else:
77
print("The task entered is invalid, Please enter correct task(mw,sfi,dp) ")
78
79
def resubscribe(self):
80
for task, marketwatch in self.task_dict.items():
81
print(task, '->', marketwatch)
82
try:
83
request = {"task": task, "channel": marketwatch, "token": self.feed_token,
84
"user": self.client_code, "acctid": self.client_code}
85
86
self.ws.send(
87
six.b(json.dumps(request))
88
)
89
return True
90
except Exception as e:
91
self._close(reason="Error while request sending: {}".format(str(e)))
92
raise
93
94
def heartBeat(self):
95
try:
96
request = {"task": "hb", "channel": "", "token": self.feed_token, "user": self.client_code,
97
"acctid": self.client_code}
98
print(request)
99
self.ws.send(
100
six.b(json.dumps(request))
101
)
102
103
except:
104
print("HeartBeat Sending Failed")
105
# time.sleep(60)
106
107
def _parse_text_message(self, message):
108
"""Parse text message."""
109
110
data = base64.b64decode(message)
111
112
try:
113
data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8')
114
data = json.loads(data.decode('utf8').replace("'", '"'))
115
data = json.loads(json.dumps(data, indent=4, sort_keys=True))
116
except ValueError:
117
return
118
119
# return data
120
if data:
121
self._on_message(self.ws,data)
122
123
def connect(self):
124
# websocket.enableTrace(True)
125
self.ws = websocket.WebSocketApp(self.ROOT_URI,
126
on_message=self.__on_message,
127
on_close=self.__on_close,
128
on_open=self.__on_open,
129
on_error=self.__on_error)
130
131
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
132
133
def __on_message(self, ws, message):
134
self._parse_text_message(message)
135
# print(msg)
136
137
def __on_open(self, ws):
138
print("__on_open################")
139
self.HB_THREAD_FLAG = False
140
self._subscribe_on_open()
141
if self.WS_RECONNECT_FLAG:
142
self.WS_RECONNECT_FLAG = False
143
self.resubscribe()
144
else:
145
self._on_open(ws)
146
147
def __on_close(self, ws):
148
self.HB_THREAD_FLAG = True
149
print("__on_close################")
150
self._on_close(ws)
151
152
def __on_error(self, ws, error):
153
154
if ( "timed" in str(error) ) or ( "Connection is already closed" in str(error) ) or ( "Connection to remote host was lost" in str(error) ):
155
156
self.WS_RECONNECT_FLAG = True
157
self.HB_THREAD_FLAG = True
158
159
if (ws is not None):
160
ws.close()
161
ws.on_message = None
162
ws.on_open = None
163
ws.close = None
164
# print (' deleting ws')
165
del ws
166
167
self.connect()
168
else:
169
print ('Error info: %s' %(error))
170
self._on_error(ws, error)
171
172
def _on_message(self, ws, message):
173
pass
174
175
def _on_open(self, ws):
176
pass
177
178
def _on_close(self, ws):
179
pass
180
181
def _on_error(self, ws, error):
182
pass
183