Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
angel-one
GitHub Repository: angel-one/smartapi-python
Path: blob/main/SmartApi/smartWebSocketOrderUpdate.py
410 views
1
import ssl
2
import websocket
3
import time
4
import logging
5
from logzero import logger
6
import logzero
7
import os
8
9
class SmartWebSocketOrderUpdate(object):
10
WEBSOCKET_URI = "wss://tns.angelone.in/smart-order-update"
11
HEARTBEAT_MESSAGE = "ping" # Heartbeat message to maintain Socket connection.
12
HEARTBEAT_INTERVAL_SECONDS = 10 # Interval for sending heartbeat messages to keep the connection alive.
13
MAX_CONNECTION_RETRY_ATTEMPTS = 2 # Max retry attempts to establish Socket connection in case of failure.
14
RETRY_DELAY_SECONDS = 10 # Delay between retry attempts when reconnecting to Socket in case of failure.
15
wsapp = None #Socket connection instance
16
last_pong_timestamp = None #Timestamp of the last received pong message
17
current_retry_attempt = 0 #Current retry attempt count
18
19
def __init__(self, auth_token, api_key, client_code, feed_token):
20
self.auth_token = auth_token
21
self.api_key = api_key
22
self.client_code = client_code
23
self.feed_token = feed_token
24
# Create a log folder based on the current date
25
log_folder = time.strftime("%Y-%m-%d", time.localtime())
26
log_folder_path = os.path.join("logs", log_folder) # Construct the full path to the log folder
27
os.makedirs(log_folder_path, exist_ok=True) # Create the log folder if it doesn't exist
28
log_path = os.path.join(log_folder_path, "app.log") # Construct the full path to the log file
29
logzero.logfile(log_path, loglevel=logging.INFO) # Output logs to a date-wise log file
30
31
def on_message(self, wsapp, message):
32
logger.info("Received message: %s", message)
33
34
def on_data(self, wsapp, message, data_type, continue_flag):
35
self.on_message(wsapp, message)
36
37
def on_open(self, wsapp):
38
logger.info("Connection opened")
39
40
def on_error(self, wsapp, error):
41
logger.error("Error: %s", error)
42
43
def on_close(self, wsapp, close_status_code, close_msg):
44
logger.info("Connection closed")
45
self.retry_connect()
46
47
def on_ping(self, wsapp, data):
48
timestamp = time.time()
49
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
50
logger.info("In on ping function ==> %s, Timestamp: %s", data, formatted_timestamp)
51
52
def on_pong(self, wsapp, data):
53
if data == self.HEARTBEAT_MESSAGE:
54
timestamp = time.time()
55
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
56
logger.info("In on pong function ==> %s, Timestamp: %s", data, formatted_timestamp)
57
self.last_pong_timestamp = timestamp
58
else:
59
self.on_data(wsapp, data, websocket.ABNF.OPCODE_BINARY, False)
60
61
def check_connection_status(self):
62
current_time = time.time()
63
if self.last_pong_timestamp is not None and current_time - self.last_pong_timestamp > 2 * self.HEARTBEAT_INTERVAL_SECONDS:
64
self.close_connection()
65
66
def connect(self):
67
headers = {
68
"Authorization": self.auth_token,
69
"x-api-key": self.api_key,
70
"x-client-code": self.client_code,
71
"x-feed-token": self.feed_token
72
}
73
try:
74
self.wsapp = websocket.WebSocketApp(self.WEBSOCKET_URI, header=headers, on_open=self.on_open,
75
on_error=self.on_error, on_close=self.on_close,
76
on_data=self.on_data, on_ping=self.on_ping, on_pong=self.on_pong)
77
self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEARTBEAT_INTERVAL_SECONDS)
78
except Exception as e:
79
logger.error("Error connecting to WebSocket: %s", e)
80
self.retry_connect()
81
82
def retry_connect(self):
83
if self.current_retry_attempt < self.MAX_CONNECTION_RETRY_ATTEMPTS:
84
logger.info("Retrying connection (Attempt %s)...", self.current_retry_attempt + 1)
85
time.sleep(self.RETRY_DELAY_SECONDS)
86
self.current_retry_attempt += 1
87
self.connect()
88
else:
89
logger.warning("Max retry attempts reached.")
90
91
def close_connection(self):
92
if self.wsapp:
93
self.wsapp.close()
94
95