Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
cantaro86
GitHub Repository: cantaro86/Financial-Models-Numerical-Methods
Path: blob/master/src/FMNM/Processes.py
1700 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Sat Jul 27 17:06:01 2019
5
6
@author: cantaro86
7
"""
8
9
import numpy as np
10
import scipy.stats as ss
11
from FMNM.probabilities import VG_pdf
12
from scipy.optimize import minimize
13
from statsmodels.tools.numdiff import approx_hess
14
import pandas as pd
15
16
17
class Diffusion_process:
18
"""
19
Class for the diffusion process:
20
r = risk free constant rate
21
sig = constant diffusion coefficient
22
mu = constant drift
23
"""
24
25
def __init__(self, r=0.1, sig=0.2, mu=0.1):
26
self.r = r
27
self.mu = mu
28
if sig <= 0:
29
raise ValueError("sig must be positive")
30
else:
31
self.sig = sig
32
33
def exp_RV(self, S0, T, N):
34
W = ss.norm.rvs((self.r - 0.5 * self.sig**2) * T, np.sqrt(T) * self.sig, N)
35
S_T = S0 * np.exp(W)
36
return S_T.reshape((N, 1))
37
38
39
class Merton_process:
40
"""
41
Class for the Merton process:
42
r = risk free constant rate
43
sig = constant diffusion coefficient
44
lam = jump activity
45
muJ = jump mean
46
sigJ = jump standard deviation
47
"""
48
49
def __init__(self, r=0.1, sig=0.2, lam=0.8, muJ=0, sigJ=0.5):
50
self.r = r
51
self.lam = lam
52
self.muJ = muJ
53
if sig < 0 or sigJ < 0:
54
raise ValueError("sig and sigJ must be positive")
55
else:
56
self.sig = sig
57
self.sigJ = sigJ
58
59
# moments
60
self.var = self.sig**2 + self.lam * self.sigJ**2 + self.lam * self.muJ**2
61
self.skew = self.lam * (3 * self.sigJ**2 * self.muJ + self.muJ**3) / self.var ** (1.5)
62
self.kurt = self.lam * (3 * self.sigJ**3 + 6 * self.sigJ**2 * self.muJ**2 + self.muJ**4) / self.var**2
63
64
def exp_RV(self, S0, T, N):
65
m = self.lam * (np.exp(self.muJ + (self.sigJ**2) / 2) - 1) # coefficient m
66
W = ss.norm.rvs(0, 1, N) # The normal RV vector
67
P = ss.poisson.rvs(self.lam * T, size=N) # Poisson random vector (number of jumps)
68
Jumps = np.asarray([ss.norm.rvs(self.muJ, self.sigJ, ind).sum() for ind in P]) # Jumps vector
69
S_T = S0 * np.exp(
70
(self.r - 0.5 * self.sig**2 - m) * T + np.sqrt(T) * self.sig * W + Jumps
71
) # Martingale exponential Merton
72
return S_T.reshape((N, 1))
73
74
75
class VG_process:
76
"""
77
Class for the Variance Gamma process:
78
r = risk free constant rate
79
Using the representation of Brownian subordination, the parameters are:
80
theta = drift of the Brownian motion
81
sigma = standard deviation of the Brownian motion
82
kappa = variance of the of the Gamma process
83
"""
84
85
def __init__(self, r=0.1, sigma=0.2, theta=-0.1, kappa=0.1):
86
self.r = r
87
self.c = self.r
88
self.theta = theta
89
self.kappa = kappa
90
if sigma < 0:
91
raise ValueError("sigma must be positive")
92
else:
93
self.sigma = sigma
94
95
# moments
96
self.mean = self.c + self.theta
97
self.var = self.sigma**2 + self.theta**2 * self.kappa
98
self.skew = (2 * self.theta**3 * self.kappa**2 + 3 * self.sigma**2 * self.theta * self.kappa) / (
99
self.var ** (1.5)
100
)
101
self.kurt = (
102
3 * self.sigma**4 * self.kappa
103
+ 12 * self.sigma**2 * self.theta**2 * self.kappa**2
104
+ 6 * self.theta**4 * self.kappa**3
105
) / (self.var**2)
106
107
def exp_RV(self, S0, T, N):
108
w = -np.log(1 - self.theta * self.kappa - self.kappa / 2 * self.sigma**2) / self.kappa # coefficient w
109
rho = 1 / self.kappa
110
G = ss.gamma(rho * T).rvs(N) / rho # The gamma RV
111
Norm = ss.norm.rvs(0, 1, N) # The normal RV
112
VG = self.theta * G + self.sigma * np.sqrt(G) * Norm # VG process at final time G
113
S_T = S0 * np.exp((self.r - w) * T + VG) # Martingale exponential VG
114
return S_T.reshape((N, 1))
115
116
def path(self, T=1, N=10000, paths=1):
117
"""
118
Creates Variance Gamma paths
119
N = number of time points (time steps are N-1)
120
paths = number of generated paths
121
"""
122
dt = T / (N - 1) # time interval
123
X0 = np.zeros((paths, 1))
124
G = ss.gamma(dt / self.kappa, scale=self.kappa).rvs(size=(paths, N - 1)) # The gamma RV
125
Norm = ss.norm.rvs(loc=0, scale=1, size=(paths, N - 1)) # The normal RV
126
increments = self.c * dt + self.theta * G + self.sigma * np.sqrt(G) * Norm
127
X = np.concatenate((X0, increments), axis=1).cumsum(1)
128
return X
129
130
def fit_from_data(self, data, dt=1, method="Nelder-Mead"):
131
"""
132
Fit the 4 parameters of the VG process using MM (method of moments),
133
Nelder-Mead, L-BFGS-B.
134
135
data (array): datapoints
136
dt (float): is the increment time
137
138
Returns (c, theta, sigma, kappa)
139
"""
140
X = data
141
sigma_mm = np.std(X) / np.sqrt(dt)
142
kappa_mm = dt * ss.kurtosis(X) / 3
143
theta_mm = np.sqrt(dt) * ss.skew(X) * sigma_mm / (3 * kappa_mm)
144
c_mm = np.mean(X) / dt - theta_mm
145
146
def log_likely(x, data, T):
147
return (-1) * np.sum(np.log(VG_pdf(data, T, x[0], x[1], x[2], x[3])))
148
149
if method == "L-BFGS-B":
150
if theta_mm < 0:
151
result = minimize(
152
log_likely,
153
x0=[c_mm, theta_mm, sigma_mm, kappa_mm],
154
method="L-BFGS-B",
155
args=(X, dt),
156
tol=1e-8,
157
bounds=[[-0.5, 0.5], [-0.6, -1e-15], [1e-15, 1], [1e-15, 2]],
158
)
159
else:
160
result = minimize(
161
log_likely,
162
x0=[c_mm, theta_mm, sigma_mm, kappa_mm],
163
method="L-BFGS-B",
164
args=(X, dt),
165
tol=1e-8,
166
bounds=[[-0.5, 0.5], [1e-15, 0.6], [1e-15, 1], [1e-15, 2]],
167
)
168
print(result.message)
169
elif method == "Nelder-Mead":
170
result = minimize(
171
log_likely,
172
x0=[c_mm, theta_mm, sigma_mm, kappa_mm],
173
method="Nelder-Mead",
174
args=(X, dt),
175
options={"disp": False, "maxfev": 3000},
176
tol=1e-8,
177
)
178
print(result.message)
179
elif "MM":
180
self.c, self.theta, self.sigma, self.kappa = (
181
c_mm,
182
theta_mm,
183
sigma_mm,
184
kappa_mm,
185
)
186
return
187
self.c, self.theta, self.sigma, self.kappa = result.x
188
189
190
class Heston_process:
191
"""
192
Class for the Heston process:
193
r = risk free constant rate
194
rho = correlation between stock noise and variance noise
195
theta = long term mean of the variance process
196
sigma = volatility coefficient of the variance process
197
kappa = mean reversion coefficient for the variance process
198
"""
199
200
def __init__(self, mu=0.1, rho=0, sigma=0.2, theta=-0.1, kappa=0.1):
201
self.mu = mu
202
if np.abs(rho) > 1:
203
raise ValueError("|rho| must be <=1")
204
self.rho = rho
205
if theta < 0 or sigma < 0 or kappa < 0:
206
raise ValueError("sigma,theta,kappa must be positive")
207
else:
208
self.theta = theta
209
self.sigma = sigma
210
self.kappa = kappa
211
212
def path(self, S0, v0, N, T=1):
213
"""
214
Produces one path of the Heston process.
215
N = number of time steps
216
T = Time in years
217
Returns two arrays S (price) and v (variance).
218
"""
219
220
MU = np.array([0, 0])
221
COV = np.matrix([[1, self.rho], [self.rho, 1]])
222
W = ss.multivariate_normal.rvs(mean=MU, cov=COV, size=N - 1)
223
W_S = W[:, 0] # Stock Brownian motion: W_1
224
W_v = W[:, 1] # Variance Brownian motion: W_2
225
226
# Initialize vectors
227
T_vec, dt = np.linspace(0, T, N, retstep=True)
228
dt_sq = np.sqrt(dt)
229
230
X0 = np.log(S0)
231
v = np.zeros(N)
232
v[0] = v0
233
X = np.zeros(N)
234
X[0] = X0
235
236
# Generate paths
237
for t in range(0, N - 1):
238
v_sq = np.sqrt(v[t])
239
v[t + 1] = np.abs(v[t] + self.kappa * (self.theta - v[t]) * dt + self.sigma * v_sq * dt_sq * W_v[t])
240
X[t + 1] = X[t] + (self.mu - 0.5 * v[t]) * dt + v_sq * dt_sq * W_S[t]
241
242
return np.exp(X), v
243
244
245
class NIG_process:
246
"""
247
Class for the Normal Inverse Gaussian process:
248
r = risk free constant rate
249
Using the representation of Brownian subordination, the parameters are:
250
theta = drift of the Brownian motion
251
sigma = standard deviation of the Brownian motion
252
kappa = variance of the of the Gamma process
253
"""
254
255
def __init__(self, r=0.1, sigma=0.2, theta=-0.1, kappa=0.1):
256
self.r = r
257
self.theta = theta
258
if sigma < 0 or kappa < 0:
259
raise ValueError("sigma and kappa must be positive")
260
else:
261
self.sigma = sigma
262
self.kappa = kappa
263
264
# moments
265
self.var = self.sigma**2 + self.theta**2 * self.kappa
266
self.skew = (3 * self.theta**3 * self.kappa**2 + 3 * self.sigma**2 * self.theta * self.kappa) / (
267
self.var ** (1.5)
268
)
269
self.kurt = (
270
3 * self.sigma**4 * self.kappa
271
+ 18 * self.sigma**2 * self.theta**2 * self.kappa**2
272
+ 15 * self.theta**4 * self.kappa**3
273
) / (self.var**2)
274
275
def exp_RV(self, S0, T, N):
276
lam = T**2 / self.kappa # scale for the IG process
277
mu_s = T / lam # scaled mean
278
w = (1 - np.sqrt(1 - 2 * self.theta * self.kappa - self.kappa * self.sigma**2)) / self.kappa
279
IG = ss.invgauss.rvs(mu=mu_s, scale=lam, size=N) # The IG RV
280
Norm = ss.norm.rvs(0, 1, N) # The normal RV
281
X = self.theta * IG + self.sigma * np.sqrt(IG) * Norm # NIG random vector
282
S_T = S0 * np.exp((self.r - w) * T + X) # exponential dynamics
283
return S_T.reshape((N, 1))
284
285
286
class GARCH:
287
"""
288
Class for the GARCH(1,1) process. Variance process:
289
290
V(t) = omega + alpha R^2(t-1) + beta V(t-1)
291
292
VL: Unconditional variance >=0
293
alpha: coefficient > 0
294
beta: coefficient > 0
295
gamma = 1 - alpha - beta
296
omega = gamma*VL
297
"""
298
299
def __init__(self, VL=0.04, alpha=0.08, beta=0.9):
300
if VL < 0 or alpha <= 0 or beta <= 0:
301
raise ValueError("VL>=0, alpha>0 and beta>0")
302
else:
303
self.VL = VL
304
self.alpha = alpha
305
self.beta = beta
306
self.gamma = 1 - self.alpha - self.beta
307
self.omega = self.gamma * self.VL
308
309
def path(self, N=1000):
310
"""
311
Generates a path with N points.
312
Returns the return process R and the variance process var
313
"""
314
eps = ss.norm.rvs(loc=0, scale=1, size=N)
315
R = np.zeros_like(eps)
316
var = np.zeros_like(eps)
317
for i in range(N):
318
var[i] = self.omega + self.alpha * R[i - 1] ** 2 + self.beta * var[i - 1]
319
R[i] = np.sqrt(var[i]) * eps[i]
320
return R, var
321
322
def fit_from_data(self, data, disp=True):
323
"""
324
MLE estimator for the GARCH
325
"""
326
# Automatic re-scaling:
327
# 1. the solver has problems with positive derivative in linesearch.
328
# 2. the log has overflows using small values
329
n = np.floor(np.log10(np.abs(data.mean())))
330
R = data / 10**n
331
332
# initial guesses
333
a0 = 0.05
334
b0 = 0.9
335
g0 = 1 - a0 - b0
336
w0 = g0 * np.var(R)
337
338
# bounds and constraint
339
bounds = ((0, None), (0, 1), (0, 1))
340
341
def sum_small_1(x):
342
return 1 - x[1] - x[2]
343
344
cons = {"fun": sum_small_1, "type": "ineq"}
345
346
def log_likely(x):
347
var = R[0] ** 2 # initial variance
348
N = len(R)
349
log_lik = 0
350
for i in range(1, N):
351
var = x[0] + x[1] * R[i - 1] ** 2 + x[2] * var # variance update
352
log_lik += -np.log(var) - (R[i] ** 2 / var)
353
return (-1) * log_lik
354
355
result = minimize(
356
log_likely,
357
x0=[w0, a0, b0],
358
method="SLSQP",
359
bounds=bounds,
360
constraints=cons,
361
tol=1e-8,
362
options={"maxiter": 150},
363
)
364
print(result.message)
365
self.omega = result.x[0] * 10 ** (2 * n)
366
self.alpha, self.beta = result.x[1:]
367
self.gamma = 1 - self.alpha - self.beta
368
self.VL = self.omega / self.gamma
369
370
if disp is True:
371
hess = approx_hess(result.x, log_likely) # hessian by finite differences
372
se = np.sqrt(np.diag(np.linalg.inv(hess))) # standard error
373
cv = ss.norm.ppf(1.0 - 0.05 / 2.0) # alpha=0.05
374
p_val = ss.norm.sf(np.abs(result.x / se)) # survival function
375
376
df = pd.DataFrame(index=["omega", "alpha", "beta"])
377
df["Params"] = result.x
378
df["SE"] = se
379
df["P-val"] = p_val
380
df["95% CI lower"] = result.x - cv * se
381
df["95% CI upper"] = result.x + cv * se
382
df.loc["omega", ["Params", "SE", "95% CI lower", "95% CI upper"]] *= 10 ** (2 * n)
383
print(df)
384
385
def log_likelihood(self, R, last_var=True):
386
"""
387
Computes the log-likelihood and optionally returns the last value
388
of the variance
389
"""
390
var = R[0] ** 2 # initial variance
391
N = len(R)
392
log_lik = 0
393
log_2pi = np.log(2 * np.pi)
394
for i in range(1, N):
395
var = self.omega + self.alpha * R[i - 1] ** 2 + self.beta * var # variance update
396
log_lik += 0.5 * (-log_2pi - np.log(var) - (R[i] ** 2 / var))
397
if last_var is True:
398
return log_lik, var
399
else:
400
return log_lik
401
402
def generate_var(self, R, R0, var0):
403
"""
404
generate the variance process.
405
R (array): return array
406
R0: initial value of the returns
407
var0: initial value of the variance
408
"""
409
N = len(R)
410
var = np.zeros(N)
411
var[0] = self.omega + self.alpha * (R0**2) + self.beta * var0
412
for i in range(1, N):
413
var[i] = self.omega + self.alpha * R[i - 1] ** 2 + self.beta * var[i - 1]
414
return var
415
416
417
class OU_process:
418
"""
419
Class for the OU process:
420
theta = long term mean
421
sigma = diffusion coefficient
422
kappa = mean reversion coefficient
423
"""
424
425
def __init__(self, sigma=0.2, theta=-0.1, kappa=0.1):
426
self.theta = theta
427
if sigma < 0 or kappa < 0:
428
raise ValueError("sigma,theta,kappa must be positive")
429
else:
430
self.sigma = sigma
431
self.kappa = kappa
432
433
def path(self, X0=0, T=1, N=10000, paths=1):
434
"""
435
Produces a matrix of OU process: X[N, paths]
436
X0 = starting point
437
N = number of time points (there are N-1 time steps)
438
T = Time in years
439
paths = number of paths
440
"""
441
442
dt = T / (N - 1)
443
X = np.zeros((N, paths))
444
X[0, :] = X0
445
W = ss.norm.rvs(loc=0, scale=1, size=(N - 1, paths))
446
447
std_dt = np.sqrt(self.sigma**2 / (2 * self.kappa) * (1 - np.exp(-2 * self.kappa * dt)))
448
for t in range(0, N - 1):
449
X[t + 1, :] = self.theta + np.exp(-self.kappa * dt) * (X[t, :] - self.theta) + std_dt * W[t, :]
450
451
return X
452
453