Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
cantaro86
GitHub Repository: cantaro86/Financial-Models-Numerical-Methods
Path: blob/master/src/FMNM/NIG_pricer.py
1700 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Fri Nov 1 12:47:00 2019
5
6
@author: cantaro86
7
"""
8
9
from scipy import sparse
10
from scipy.sparse.linalg import splu
11
from time import time
12
import numpy as np
13
import scipy as scp
14
from scipy import signal
15
from scipy.integrate import quad
16
import scipy.stats as ss
17
import scipy.special as scps
18
19
import matplotlib.pyplot as plt
20
from matplotlib import cm
21
from FMNM.CF import cf_NIG
22
from FMNM.probabilities import Q1, Q2
23
from functools import partial
24
25
26
class NIG_pricer:
27
"""
28
Closed Formula.
29
Monte Carlo.
30
Finite-difference PIDE: Explicit-implicit scheme, with Brownian approximation
31
32
0 = dV/dt + (r -(1/2)sig^2 -w) dV/dx + (1/2)sig^2 d^V/dx^2
33
+ \int[ V(x+y) nu(dy) ] -(r+lam)V
34
"""
35
36
def __init__(self, Option_info, Process_info):
37
"""
38
Process_info: of type NIG_process. It contains the interest rate r
39
and the NIG parameters (sigma, theta, kappa)
40
41
Option_info: of type Option_param.
42
It contains (S0,K,T) i.e. current price, strike, maturity in years
43
"""
44
self.r = Process_info.r # interest rate
45
self.sigma = Process_info.sigma # NIG parameter
46
self.theta = Process_info.theta # NIG parameter
47
self.kappa = Process_info.kappa # NIG parameter
48
self.exp_RV = Process_info.exp_RV # function to generate exponential NIG Random Variables
49
50
self.S0 = Option_info.S0 # current price
51
self.K = Option_info.K # strike
52
self.T = Option_info.T # maturity in years
53
54
self.price = 0
55
self.S_vec = None
56
self.price_vec = None
57
self.mesh = None
58
self.exercise = Option_info.exercise
59
self.payoff = Option_info.payoff
60
61
def payoff_f(self, S):
62
if self.payoff == "call":
63
Payoff = np.maximum(S - self.K, 0)
64
elif self.payoff == "put":
65
Payoff = np.maximum(self.K - S, 0)
66
return Payoff
67
68
def Fourier_inversion(self):
69
"""
70
Price obtained by inversion of the characteristic function
71
"""
72
k = np.log(self.K / self.S0) # log moneyness
73
w = (
74
1 - np.sqrt(1 - 2 * self.theta * self.kappa - self.kappa * self.sigma**2)
75
) / self.kappa # martingale correction
76
77
cf_NIG_b = partial(
78
cf_NIG,
79
t=self.T,
80
mu=(self.r - w),
81
theta=self.theta,
82
sigma=self.sigma,
83
kappa=self.kappa,
84
)
85
86
if self.payoff == "call":
87
call = self.S0 * Q1(k, cf_NIG_b, np.inf) - self.K * np.exp(-self.r * self.T) * Q2(
88
k, cf_NIG_b, np.inf
89
) # pricing function
90
return call
91
elif self.payoff == "put":
92
put = self.K * np.exp(-self.r * self.T) * (1 - Q2(k, cf_NIG_b, np.inf)) - self.S0 * (
93
1 - Q1(k, cf_NIG_b, np.inf)
94
) # pricing function
95
return put
96
else:
97
raise ValueError("invalid type. Set 'call' or 'put'")
98
99
def MC(self, N, Err=False, Time=False):
100
"""
101
NIG Monte Carlo
102
Err = return Standard Error if True
103
Time = return execution time if True
104
"""
105
t_init = time()
106
107
S_T = self.exp_RV(self.S0, self.T, N)
108
V = scp.mean(np.exp(-self.r * self.T) * self.payoff_f(S_T))
109
110
if Err is True:
111
if Time is True:
112
elapsed = time() - t_init
113
return V, ss.sem(np.exp(-self.r * self.T) * self.payoff_f(S_T)), elapsed
114
else:
115
return V, ss.sem(np.exp(-self.r * self.T) * self.payoff_f(S_T))
116
else:
117
if Time is True:
118
elapsed = time() - t_init
119
return V, elapsed
120
else:
121
return V
122
123
def NIG_measure(self, x):
124
A = self.theta / (self.sigma**2)
125
B = np.sqrt(self.theta**2 + self.sigma**2 / self.kappa) / self.sigma**2
126
C = np.sqrt(self.theta**2 + self.sigma**2 / self.kappa) / (np.pi * self.sigma * np.sqrt(self.kappa))
127
return C / np.abs(x) * np.exp(A * (x)) * scps.kv(1, B * np.abs(x))
128
129
def PIDE_price(self, steps, Time=False):
130
"""
131
steps = tuple with number of space steps and time steps
132
payoff = "call" or "put"
133
exercise = "European" or "American"
134
Time = Boolean. Execution time.
135
"""
136
t_init = time()
137
138
Nspace = steps[0]
139
Ntime = steps[1]
140
141
S_max = 2000 * float(self.K)
142
S_min = float(self.K) / 2000
143
x_max = np.log(S_max)
144
x_min = np.log(S_min)
145
146
dev_X = np.sqrt(self.sigma**2 + self.theta**2 * self.kappa) # std dev NIG process
147
148
dx = (x_max - x_min) / (Nspace - 1)
149
extraP = int(np.floor(7 * dev_X / dx)) # extra points beyond the B.C.
150
x = np.linspace(x_min - extraP * dx, x_max + extraP * dx, Nspace + 2 * extraP) # space discretization
151
t, dt = np.linspace(0, self.T, Ntime, retstep=True) # time discretization
152
153
Payoff = self.payoff_f(np.exp(x))
154
offset = np.zeros(Nspace - 2)
155
V = np.zeros((Nspace + 2 * extraP, Ntime)) # grid initialization
156
157
if self.payoff == "call":
158
V[:, -1] = Payoff # terminal conditions
159
V[-extraP - 1 :, :] = np.exp(x[-extraP - 1 :]).reshape(extraP + 1, 1) * np.ones(
160
(extraP + 1, Ntime)
161
) - self.K * np.exp(-self.r * t[::-1]) * np.ones(
162
(extraP + 1, Ntime)
163
) # boundary condition
164
V[: extraP + 1, :] = 0
165
else:
166
V[:, -1] = Payoff
167
V[-extraP - 1 :, :] = 0
168
V[: extraP + 1, :] = self.K * np.exp(-self.r * t[::-1]) * np.ones((extraP + 1, Ntime))
169
170
eps = 1.5 * dx # the cutoff near 0
171
lam = (
172
quad(self.NIG_measure, -(extraP + 1.5) * dx, -eps)[0] + quad(self.NIG_measure, eps, (extraP + 1.5) * dx)[0]
173
) # approximated intensity
174
175
def int_w(y):
176
return (np.exp(y) - 1) * self.NIG_measure(y)
177
178
def int_s(y):
179
return y**2 * self.NIG_measure(y)
180
181
w = quad(int_w, -(extraP + 1.5) * dx, -eps)[0] + quad(int_w, eps, (extraP + 1.5) * dx)[0] # is the approx of w
182
sig2 = quad(int_s, -eps, eps, points=0)[0] # the small jumps variance
183
184
dxx = dx * dx
185
a = (dt / 2) * ((self.r - w - 0.5 * sig2) / dx - sig2 / dxx)
186
b = 1 + dt * (sig2 / dxx + self.r + lam)
187
c = -(dt / 2) * ((self.r - w - 0.5 * sig2) / dx + sig2 / dxx)
188
D = sparse.diags([a, b, c], [-1, 0, 1], shape=(Nspace - 2, Nspace - 2)).tocsc()
189
DD = splu(D)
190
191
nu = np.zeros(2 * extraP + 3) # Lévy measure vector
192
x_med = extraP + 1 # middle point in nu vector
193
x_nu = np.linspace(-(extraP + 1 + 0.5) * dx, (extraP + 1 + 0.5) * dx, 2 * (extraP + 2)) # integration domain
194
for i in range(len(nu)):
195
if (i == x_med) or (i == x_med - 1) or (i == x_med + 1):
196
continue
197
nu[i] = quad(self.NIG_measure, x_nu[i], x_nu[i + 1])[0]
198
199
if self.exercise == "European":
200
# Backward iteration
201
for i in range(Ntime - 2, -1, -1):
202
offset[0] = a * V[extraP, i]
203
offset[-1] = c * V[-1 - extraP, i]
204
V_jump = V[extraP + 1 : -extraP - 1, i + 1] + dt * signal.convolve(
205
V[:, i + 1], nu[::-1], mode="valid", method="auto"
206
)
207
V[extraP + 1 : -extraP - 1, i] = DD.solve(V_jump - offset)
208
elif self.exercise == "American":
209
for i in range(Ntime - 2, -1, -1):
210
offset[0] = a * V[extraP, i]
211
offset[-1] = c * V[-1 - extraP, i]
212
V_jump = V[extraP + 1 : -extraP - 1, i + 1] + dt * signal.convolve(
213
V[:, i + 1], nu[::-1], mode="valid", method="auto"
214
)
215
V[extraP + 1 : -extraP - 1, i] = np.maximum(DD.solve(V_jump - offset), Payoff[extraP + 1 : -extraP - 1])
216
217
X0 = np.log(self.S0) # current log-price
218
self.S_vec = np.exp(x[extraP + 1 : -extraP - 1]) # vector of S
219
self.price = np.interp(X0, x, V[:, 0])
220
self.price_vec = V[extraP + 1 : -extraP - 1, 0]
221
self.mesh = V[extraP + 1 : -extraP - 1, :]
222
223
if Time is True:
224
elapsed = time() - t_init
225
return self.price, elapsed
226
else:
227
return self.price
228
229
def plot(self, axis=None):
230
if type(self.S_vec) != np.ndarray or type(self.price_vec) != np.ndarray:
231
self.PIDE_price((5000, 4000))
232
233
plt.plot(self.S_vec, self.payoff_f(self.S_vec), color="blue", label="Payoff")
234
plt.plot(self.S_vec, self.price_vec, color="red", label="NIG curve")
235
if type(axis) == list:
236
plt.axis(axis)
237
plt.xlabel("S")
238
plt.ylabel("price")
239
plt.title("NIG price")
240
plt.legend(loc="best")
241
plt.show()
242
243
def mesh_plt(self):
244
if type(self.S_vec) != np.ndarray or type(self.mesh) != np.ndarray:
245
self.PDE_price((7000, 5000))
246
247
fig = plt.figure()
248
ax = fig.add_subplot(111, projection="3d")
249
250
X, Y = np.meshgrid(np.linspace(0, self.T, self.mesh.shape[1]), self.S_vec)
251
ax.plot_surface(Y, X, self.mesh, cmap=cm.ocean)
252
ax.set_title("NIG price surface")
253
ax.set_xlabel("S")
254
ax.set_ylabel("t")
255
ax.set_zlabel("V")
256
ax.view_init(30, -100) # this function rotates the 3d plot
257
plt.show()
258
259