Path: blob/master/src/FMNM/NIG_pricer.py
1700 views
#!/usr/bin/env python31# -*- coding: utf-8 -*-2"""3Created on Fri Nov 1 12:47:00 201945@author: cantaro866"""78from scipy import sparse9from scipy.sparse.linalg import splu10from time import time11import numpy as np12import scipy as scp13from scipy import signal14from scipy.integrate import quad15import scipy.stats as ss16import scipy.special as scps1718import matplotlib.pyplot as plt19from matplotlib import cm20from FMNM.CF import cf_NIG21from FMNM.probabilities import Q1, Q222from functools import partial232425class NIG_pricer:26"""27Closed Formula.28Monte Carlo.29Finite-difference PIDE: Explicit-implicit scheme, with Brownian approximation30310 = dV/dt + (r -(1/2)sig^2 -w) dV/dx + (1/2)sig^2 d^V/dx^232+ \int[ V(x+y) nu(dy) ] -(r+lam)V33"""3435def __init__(self, Option_info, Process_info):36"""37Process_info: of type NIG_process. It contains the interest rate r38and the NIG parameters (sigma, theta, kappa)3940Option_info: of type Option_param.41It contains (S0,K,T) i.e. current price, strike, maturity in years42"""43self.r = Process_info.r # interest rate44self.sigma = Process_info.sigma # NIG parameter45self.theta = Process_info.theta # NIG parameter46self.kappa = Process_info.kappa # NIG parameter47self.exp_RV = Process_info.exp_RV # function to generate exponential NIG Random Variables4849self.S0 = Option_info.S0 # current price50self.K = Option_info.K # strike51self.T = Option_info.T # maturity in years5253self.price = 054self.S_vec = None55self.price_vec = None56self.mesh = None57self.exercise = Option_info.exercise58self.payoff = Option_info.payoff5960def payoff_f(self, S):61if self.payoff == "call":62Payoff = np.maximum(S - self.K, 0)63elif self.payoff == "put":64Payoff = np.maximum(self.K - S, 0)65return Payoff6667def Fourier_inversion(self):68"""69Price obtained by inversion of the characteristic function70"""71k = np.log(self.K / self.S0) # log moneyness72w = (731 - np.sqrt(1 - 2 * self.theta * self.kappa - self.kappa * self.sigma**2)74) / self.kappa # martingale correction7576cf_NIG_b = partial(77cf_NIG,78t=self.T,79mu=(self.r - w),80theta=self.theta,81sigma=self.sigma,82kappa=self.kappa,83)8485if self.payoff == "call":86call = self.S0 * Q1(k, cf_NIG_b, np.inf) - self.K * np.exp(-self.r * self.T) * Q2(87k, cf_NIG_b, np.inf88) # pricing function89return call90elif self.payoff == "put":91put = self.K * np.exp(-self.r * self.T) * (1 - Q2(k, cf_NIG_b, np.inf)) - self.S0 * (921 - Q1(k, cf_NIG_b, np.inf)93) # pricing function94return put95else:96raise ValueError("invalid type. Set 'call' or 'put'")9798def MC(self, N, Err=False, Time=False):99"""100NIG Monte Carlo101Err = return Standard Error if True102Time = return execution time if True103"""104t_init = time()105106S_T = self.exp_RV(self.S0, self.T, N)107V = scp.mean(np.exp(-self.r * self.T) * self.payoff_f(S_T))108109if Err is True:110if Time is True:111elapsed = time() - t_init112return V, ss.sem(np.exp(-self.r * self.T) * self.payoff_f(S_T)), elapsed113else:114return V, ss.sem(np.exp(-self.r * self.T) * self.payoff_f(S_T))115else:116if Time is True:117elapsed = time() - t_init118return V, elapsed119else:120return V121122def NIG_measure(self, x):123A = self.theta / (self.sigma**2)124B = np.sqrt(self.theta**2 + self.sigma**2 / self.kappa) / self.sigma**2125C = np.sqrt(self.theta**2 + self.sigma**2 / self.kappa) / (np.pi * self.sigma * np.sqrt(self.kappa))126return C / np.abs(x) * np.exp(A * (x)) * scps.kv(1, B * np.abs(x))127128def PIDE_price(self, steps, Time=False):129"""130steps = tuple with number of space steps and time steps131payoff = "call" or "put"132exercise = "European" or "American"133Time = Boolean. Execution time.134"""135t_init = time()136137Nspace = steps[0]138Ntime = steps[1]139140S_max = 2000 * float(self.K)141S_min = float(self.K) / 2000142x_max = np.log(S_max)143x_min = np.log(S_min)144145dev_X = np.sqrt(self.sigma**2 + self.theta**2 * self.kappa) # std dev NIG process146147dx = (x_max - x_min) / (Nspace - 1)148extraP = int(np.floor(7 * dev_X / dx)) # extra points beyond the B.C.149x = np.linspace(x_min - extraP * dx, x_max + extraP * dx, Nspace + 2 * extraP) # space discretization150t, dt = np.linspace(0, self.T, Ntime, retstep=True) # time discretization151152Payoff = self.payoff_f(np.exp(x))153offset = np.zeros(Nspace - 2)154V = np.zeros((Nspace + 2 * extraP, Ntime)) # grid initialization155156if self.payoff == "call":157V[:, -1] = Payoff # terminal conditions158V[-extraP - 1 :, :] = np.exp(x[-extraP - 1 :]).reshape(extraP + 1, 1) * np.ones(159(extraP + 1, Ntime)160) - self.K * np.exp(-self.r * t[::-1]) * np.ones(161(extraP + 1, Ntime)162) # boundary condition163V[: extraP + 1, :] = 0164else:165V[:, -1] = Payoff166V[-extraP - 1 :, :] = 0167V[: extraP + 1, :] = self.K * np.exp(-self.r * t[::-1]) * np.ones((extraP + 1, Ntime))168169eps = 1.5 * dx # the cutoff near 0170lam = (171quad(self.NIG_measure, -(extraP + 1.5) * dx, -eps)[0] + quad(self.NIG_measure, eps, (extraP + 1.5) * dx)[0]172) # approximated intensity173174def int_w(y):175return (np.exp(y) - 1) * self.NIG_measure(y)176177def int_s(y):178return y**2 * self.NIG_measure(y)179180w = quad(int_w, -(extraP + 1.5) * dx, -eps)[0] + quad(int_w, eps, (extraP + 1.5) * dx)[0] # is the approx of w181sig2 = quad(int_s, -eps, eps, points=0)[0] # the small jumps variance182183dxx = dx * dx184a = (dt / 2) * ((self.r - w - 0.5 * sig2) / dx - sig2 / dxx)185b = 1 + dt * (sig2 / dxx + self.r + lam)186c = -(dt / 2) * ((self.r - w - 0.5 * sig2) / dx + sig2 / dxx)187D = sparse.diags([a, b, c], [-1, 0, 1], shape=(Nspace - 2, Nspace - 2)).tocsc()188DD = splu(D)189190nu = np.zeros(2 * extraP + 3) # Lévy measure vector191x_med = extraP + 1 # middle point in nu vector192x_nu = np.linspace(-(extraP + 1 + 0.5) * dx, (extraP + 1 + 0.5) * dx, 2 * (extraP + 2)) # integration domain193for i in range(len(nu)):194if (i == x_med) or (i == x_med - 1) or (i == x_med + 1):195continue196nu[i] = quad(self.NIG_measure, x_nu[i], x_nu[i + 1])[0]197198if self.exercise == "European":199# Backward iteration200for i in range(Ntime - 2, -1, -1):201offset[0] = a * V[extraP, i]202offset[-1] = c * V[-1 - extraP, i]203V_jump = V[extraP + 1 : -extraP - 1, i + 1] + dt * signal.convolve(204V[:, i + 1], nu[::-1], mode="valid", method="auto"205)206V[extraP + 1 : -extraP - 1, i] = DD.solve(V_jump - offset)207elif self.exercise == "American":208for i in range(Ntime - 2, -1, -1):209offset[0] = a * V[extraP, i]210offset[-1] = c * V[-1 - extraP, i]211V_jump = V[extraP + 1 : -extraP - 1, i + 1] + dt * signal.convolve(212V[:, i + 1], nu[::-1], mode="valid", method="auto"213)214V[extraP + 1 : -extraP - 1, i] = np.maximum(DD.solve(V_jump - offset), Payoff[extraP + 1 : -extraP - 1])215216X0 = np.log(self.S0) # current log-price217self.S_vec = np.exp(x[extraP + 1 : -extraP - 1]) # vector of S218self.price = np.interp(X0, x, V[:, 0])219self.price_vec = V[extraP + 1 : -extraP - 1, 0]220self.mesh = V[extraP + 1 : -extraP - 1, :]221222if Time is True:223elapsed = time() - t_init224return self.price, elapsed225else:226return self.price227228def plot(self, axis=None):229if type(self.S_vec) != np.ndarray or type(self.price_vec) != np.ndarray:230self.PIDE_price((5000, 4000))231232plt.plot(self.S_vec, self.payoff_f(self.S_vec), color="blue", label="Payoff")233plt.plot(self.S_vec, self.price_vec, color="red", label="NIG curve")234if type(axis) == list:235plt.axis(axis)236plt.xlabel("S")237plt.ylabel("price")238plt.title("NIG price")239plt.legend(loc="best")240plt.show()241242def mesh_plt(self):243if type(self.S_vec) != np.ndarray or type(self.mesh) != np.ndarray:244self.PDE_price((7000, 5000))245246fig = plt.figure()247ax = fig.add_subplot(111, projection="3d")248249X, Y = np.meshgrid(np.linspace(0, self.T, self.mesh.shape[1]), self.S_vec)250ax.plot_surface(Y, X, self.mesh, cmap=cm.ocean)251ax.set_title("NIG price surface")252ax.set_xlabel("S")253ax.set_ylabel("t")254ax.set_zlabel("V")255ax.view_init(30, -100) # this function rotates the 3d plot256plt.show()257258259