Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sreenivasdoosa
GitHub Repository: sreenivasdoosa/sdoosa-algo-trade-python
Path: blob/master/src/utils/Utils.py
296 views
1
import math
2
import uuid
3
import time
4
import logging
5
import calendar
6
from datetime import datetime, timedelta
7
8
from config.Config import getHolidays
9
from models.Direction import Direction
10
from trademgmt.TradeState import TradeState
11
12
class Utils:
13
dateFormat = "%Y-%m-%d"
14
timeFormat = "%H:%M:%S"
15
dateTimeFormat = "%Y-%m-%d %H:%M:%S"
16
17
@staticmethod
18
def roundOff(price): # Round off to 2 decimal places
19
return round(price, 2)
20
21
@staticmethod
22
def roundToNSEPrice(price):
23
x = round(price, 2) * 20
24
y = math.ceil(x)
25
return y / 20
26
27
@staticmethod
28
def isMarketOpen():
29
if Utils.isTodayHoliday():
30
return False
31
now = datetime.now()
32
marketStartTime = Utils.getMarketStartTime()
33
marketEndTime = Utils.getMarketEndTime()
34
return now >= marketStartTime and now <= marketEndTime
35
36
@staticmethod
37
def isMarketClosedForTheDay():
38
# This method returns true if the current time is > marketEndTime
39
# Please note this will not return true if current time is < marketStartTime on a trading day
40
if Utils.isTodayHoliday():
41
return True
42
now = datetime.now()
43
marketEndTime = Utils.getMarketEndTime()
44
return now > marketEndTime
45
46
@staticmethod
47
def waitTillMarketOpens(context):
48
nowEpoch = Utils.getEpoch(datetime.now())
49
marketStartTimeEpoch = Utils.getEpoch(Utils.getMarketStartTime())
50
waitSeconds = marketStartTimeEpoch - nowEpoch
51
if waitSeconds > 0:
52
logging.info("%s: Waiting for %d seconds till market opens...", context, waitSeconds)
53
time.sleep(waitSeconds)
54
55
@staticmethod
56
def getEpoch(datetimeObj = None):
57
# This method converts given datetimeObj to epoch seconds
58
if datetimeObj == None:
59
datetimeObj = datetime.now()
60
epochSeconds = datetime.timestamp(datetimeObj)
61
return int(epochSeconds) # converting double to long
62
63
@staticmethod
64
def getMarketStartTime(dateTimeObj = None):
65
return Utils.getTimeOfDay(9, 15, 0, dateTimeObj)
66
67
@staticmethod
68
def getMarketEndTime(dateTimeObj = None):
69
return Utils.getTimeOfDay(15, 30, 0, dateTimeObj)
70
71
@staticmethod
72
def getTimeOfDay(hours, minutes, seconds, dateTimeObj = None):
73
if dateTimeObj == None:
74
dateTimeObj = datetime.now()
75
dateTimeObj = dateTimeObj.replace(hour=hours, minute=minutes, second=seconds, microsecond=0)
76
return dateTimeObj
77
78
@staticmethod
79
def getTimeOfToDay(hours, minutes, seconds):
80
return Utils.getTimeOfDay(hours, minutes, seconds, datetime.now())
81
82
@staticmethod
83
def getTodayDateStr():
84
return Utils.convertToDateStr(datetime.now())
85
86
@staticmethod
87
def convertToDateStr(datetimeObj):
88
return datetimeObj.strftime(Utils.dateFormat)
89
90
@staticmethod
91
def isHoliday(datetimeObj):
92
dayOfWeek = calendar.day_name[datetimeObj.weekday()]
93
if dayOfWeek == 'Saturday' or dayOfWeek == 'Sunday':
94
return True
95
96
dateStr = Utils.convertToDateStr(datetimeObj)
97
holidays = getHolidays()
98
if (dateStr in holidays):
99
return True
100
else:
101
return False
102
103
@staticmethod
104
def isTodayHoliday():
105
return Utils.isHoliday(datetime.now())
106
107
@staticmethod
108
def generateTradeID():
109
return str(uuid.uuid4())
110
111
@staticmethod
112
def calculateTradePnl(trade):
113
if trade.tradeState == TradeState.ACTIVE:
114
if trade.cmp > 0:
115
if trade.direction == Direction.LONG:
116
trade.pnl = Utils.roundOff(trade.filledQty * (trade.cmp - trade.entry))
117
else:
118
trade.pnl = Utils.roundOff(trade.filledQty * (trade.entry - trade.cmp))
119
else:
120
if trade.exit > 0:
121
if trade.direction == Direction.LONG:
122
trade.pnl = Utils.roundOff(trade.filledQty * (trade.exit - trade.entry))
123
else:
124
trade.pnl = Utils.roundOff(trade.filledQty * (trade.entry - trade.exit))
125
tradeValue = trade.entry * trade.filledQty
126
if tradeValue > 0:
127
trade.pnlPercentage = Utils.roundOff(trade.pnl * 100 / tradeValue)
128
return trade
129
130
@staticmethod
131
def prepareMonthlyExpiryFuturesSymbol(inputSymbol):
132
expiryDateTime = Utils.getMonthlyExpiryDayDate()
133
expiryDateMarketEndTime = Utils.getMarketEndTime(expiryDateTime)
134
now = datetime.now()
135
if now > expiryDateMarketEndTime:
136
# increasing today date by 20 days to get some day in next month passing to getMonthlyExpiryDayDate()
137
expiryDateTime = Utils.getMonthlyExpiryDayDate(now + timedelta(days=20))
138
year2Digits = str(expiryDateTime.year)[2:]
139
monthShort = calendar.month_name[expiryDateTime.month].upper()[0:3]
140
futureSymbol = inputSymbol + year2Digits + monthShort + 'FUT'
141
logging.info('prepareMonthlyExpiryFuturesSymbol[%s] = %s', inputSymbol, futureSymbol)
142
return futureSymbol
143
144
@staticmethod
145
def prepareWeeklyOptionsSymbol(inputSymbol, strike, optionType, numWeeksPlus = 0):
146
expiryDateTime = Utils.getWeeklyExpiryDayDate()
147
todayMarketStartTime = Utils.getMarketStartTime()
148
expiryDayMarketEndTime = Utils.getMarketEndTime(expiryDateTime)
149
if numWeeksPlus > 0:
150
expiryDateTime = expiryDateTime + timedelta(days=numWeeksPlus * 7)
151
expiryDateTime = Utils.getWeeklyExpiryDayDate(expiryDateTime)
152
if todayMarketStartTime > expiryDayMarketEndTime:
153
expiryDateTime = expiryDateTime + timedelta(days=6)
154
expiryDateTime = Utils.getWeeklyExpiryDayDate(expiryDateTime)
155
# Check if monthly and weekly expiry same
156
expiryDateTimeMonthly = Utils.getMonthlyExpiryDayDate()
157
weekAndMonthExpriySame = False
158
if expiryDateTime == expiryDateTimeMonthly:
159
weekAndMonthExpriySame = True
160
logging.info('Weekly and Monthly expiry is same for %s', expiryDateTime)
161
year2Digits = str(expiryDateTime.year)[2:]
162
optionSymbol = None
163
if weekAndMonthExpriySame == True:
164
monthShort = calendar.month_name[expiryDateTime.month].upper()[0:3]
165
optionSymbol = inputSymbol + str(year2Digits) + monthShort + str(strike) + optionType.upper()
166
else:
167
m = expiryDateTime.month
168
d = expiryDateTime.day
169
mStr = str(m)
170
if m == 10:
171
mStr = "O"
172
elif m == 11:
173
mStr = "N"
174
elif m == 12:
175
mStr = "D"
176
dStr = ("0" + str(d)) if d < 10 else str(d)
177
optionSymbol = inputSymbol + str(year2Digits) + mStr + dStr + str(strike) + optionType.upper()
178
logging.info('prepareWeeklyOptionsSymbol[%s, %d, %s, %d] = %s', inputSymbol, strike, optionType, numWeeksPlus, optionSymbol)
179
return optionSymbol
180
181
@staticmethod
182
def getMonthlyExpiryDayDate(datetimeObj = None):
183
if datetimeObj == None:
184
datetimeObj = datetime.now()
185
year = datetimeObj.year
186
month = datetimeObj.month
187
lastDay = calendar.monthrange(year, month)[1] # 2nd entry is the last day of the month
188
datetimeExpiryDay = datetime(year, month, lastDay)
189
while calendar.day_name[datetimeExpiryDay.weekday()] != 'Thursday':
190
datetimeExpiryDay = datetimeExpiryDay - timedelta(days=1)
191
while Utils.isHoliday(datetimeExpiryDay) == True:
192
datetimeExpiryDay = datetimeExpiryDay - timedelta(days=1)
193
194
datetimeExpiryDay = Utils.getTimeOfDay(0, 0, 0, datetimeExpiryDay)
195
return datetimeExpiryDay
196
197
@staticmethod
198
def getWeeklyExpiryDayDate(dateTimeObj = None):
199
if dateTimeObj == None:
200
dateTimeObj = datetime.now()
201
daysToAdd = 0
202
if dateTimeObj.weekday() >= 3:
203
daysToAdd = -1 * (dateTimeObj.weekday() - 3)
204
else:
205
daysToAdd = 3 - dateTimeObj.weekday()
206
datetimeExpiryDay = dateTimeObj + timedelta(days=daysToAdd)
207
while Utils.isHoliday(datetimeExpiryDay) == True:
208
datetimeExpiryDay = datetimeExpiryDay - timedelta(days=1)
209
210
datetimeExpiryDay = Utils.getTimeOfDay(0, 0, 0, datetimeExpiryDay)
211
return datetimeExpiryDay
212
213
@staticmethod
214
def isTodayWeeklyExpiryDay():
215
expiryDate = Utils.getWeeklyExpiryDayDate()
216
todayDate = Utils.getTimeOfToDay(0, 0, 0)
217
if expiryDate == todayDate:
218
return True
219
return False
220
221
@staticmethod
222
def isTodayOneDayBeforeWeeklyExpiryDay():
223
expiryDate = Utils.getWeeklyExpiryDayDate()
224
todayDate = Utils.getTimeOfToDay(0, 0, 0)
225
if expiryDate - timedelta(days=1) == todayDate:
226
return True
227
return False
228
229
@staticmethod
230
def getNearestStrikePrice(price, nearestMultiple = 50):
231
inputPrice = int(price)
232
remainder = int(inputPrice % nearestMultiple)
233
if remainder < int(nearestMultiple / 2):
234
return inputPrice - remainder
235
else:
236
return inputPrice + (nearestMultiple - remainder)
237
238