Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
packtpublishing
GitHub Repository: packtpublishing/machine-learning-for-algorithmic-trading-second-edition
Path: blob/master/17_deep_learning/05_backtesting_with_zipline.ipynb
2923 views
Kernel: Python 3

Backtesting with zipline - Pipeline API with Custom Data

This notebook requires the conda environment backtest. Please see the installation instructions for running the latest Docker image or alternative ways to set up your environment.

Imports & Settings

import warnings warnings.filterwarnings('ignore')
from pathlib import Path from time import time import numpy as np import pandas as pd import pandas_datareader.data as web from logbook import Logger, StderrHandler, INFO, WARNING from zipline import run_algorithm from zipline.api import (attach_pipeline, pipeline_output, date_rules, time_rules, record, schedule_function, commission, slippage, set_slippage, set_commission, set_max_leverage, order_target, order_target_percent, get_open_orders, cancel_order) from zipline.data import bundles from zipline.utils.run_algo import load_extensions from zipline.pipeline import Pipeline, CustomFactor from zipline.pipeline.data import Column, DataSet from zipline.pipeline.domain import US_EQUITIES from zipline.pipeline.filters import StaticAssets from zipline.pipeline.loaders import USEquityPricingLoader from zipline.pipeline.loaders.frame import DataFrameLoader from trading_calendars import get_calendar import pyfolio as pf from pyfolio.plotting import plot_rolling_returns, plot_rolling_sharpe from pyfolio.timeseries import forecast_cone_bootstrap from alphalens.tears import (create_returns_tear_sheet, create_summary_tear_sheet, create_full_tear_sheet) from alphalens.performance import mean_return_by_quantile from alphalens.plotting import plot_quantile_returns_bar from alphalens.utils import get_clean_factor_and_forward_returns, rate_of_return import matplotlib.pyplot as plt import seaborn as sns
sns.set_style('whitegrid') np.random.seed(42) idx = pd.IndexSlice
results_path = Path('results')

Alphalens Analysis

DATA_STORE = Path('..', 'data', 'assets.h5')
def get_trade_prices(tickers): prices = (pd.read_hdf(DATA_STORE, 'quandl/wiki/prices').swaplevel().sort_index()) prices.index.names = ['symbol', 'date'] prices = prices.loc[idx[tickers, '2015':'2018'], 'adj_open'] return (prices .unstack('symbol') .sort_index() .shift(-1) .tz_localize('UTC'))
predictions = (pd.read_hdf(results_path / 'test_preds.h5', 'predictions') .iloc[:, :3] .mean(1) .to_frame('prediction'))
factor = (predictions .unstack('symbol') .asfreq('D') .dropna(how='all') .stack() .tz_localize('UTC', level='date') .sort_index()) tickers = factor.index.get_level_values('symbol').unique()
trade_prices = get_trade_prices(tickers)
factor_data = get_clean_factor_and_forward_returns(factor=factor, prices=trade_prices, quantiles=5, max_loss=0.3, periods=(1, 5, 10, 21)).sort_index() factor_data.info()
Dropped 3.6% entries from factor data: 3.6% in forward returns computation and 0.0% in binning phase (set max_loss=0 to see potentially suppressed Exceptions). max_loss is 30.0%, not exceeded: OK! <class 'pandas.core.frame.DataFrame'> MultiIndex: 721920 entries, (2015-01-02 00:00:00+00:00, A) to (2017-11-29 00:00:00+00:00, ZION) Data columns (total 6 columns): 1D 721920 non-null float64 5D 721920 non-null float64 10D 721920 non-null float64 21D 721920 non-null float64 factor 721920 non-null float32 factor_quantile 721920 non-null int64 dtypes: float32(1), float64(4), int64(1) memory usage: 33.1+ MB
create_summary_tear_sheet(factor_data)
Quantiles Statistics
Returns Analysis
Information Analysis
Turnover Analysis
<Figure size 432x288 with 0 Axes>
Image in a Jupyter notebook

Load zipline extensions

Only need this in notebook to find bundle.

load_extensions(default=True, extensions=[], strict=True, environ=None)
log_handler = StderrHandler(format_string='[{record.time:%Y-%m-%d %H:%M:%S.%f}]: ' + '{record.level_name}: {record.func_name}: {record.message}', level=WARNING) log_handler.push_application() log = Logger('Algorithm')

Algo Params

N_LONGS = 25 N_SHORTS = 25 MIN_POSITIONS = 10

Load Data

Quandl Wiki Bundel

bundle_data = bundles.load('quandl')

ML Predictions

def load_predictions(bundle): predictions = (pd.read_hdf(results_path / 'test_preds.h5', 'predictions') .iloc[:, :3] .mean(1) .to_frame('prediction')) tickers = predictions.index.get_level_values('symbol').unique().tolist() assets = bundle.asset_finder.lookup_symbols(tickers, as_of_date=None) predicted_sids = pd.Int64Index([asset.sid for asset in assets]) ticker_map = dict(zip(tickers, predicted_sids)) return (predictions .unstack('symbol') .rename(columns=ticker_map) .prediction .tz_localize('UTC')), assets
predictions, assets = load_predictions(bundle_data)
predictions.info()
<class 'pandas.core.frame.DataFrame'> DatetimeIndex: 756 entries, 2014-11-28 to 2017-11-29 Columns: 995 entries, 0 to 3188 dtypes: float32(995) memory usage: 2.9 MB

Define Custom Dataset

class SignalData(DataSet): predictions = Column(dtype=float) domain = US_EQUITIES

Define Pipeline Loaders

signal_loader = {SignalData.predictions: DataFrameLoader(SignalData.predictions, predictions)}

Pipeline Setup

Custom ML Factor

class MLSignal(CustomFactor): """Converting signals to Factor so we can rank and filter in Pipeline""" inputs = [SignalData.predictions] window_length = 1 def compute(self, today, assets, out, predictions): out[:] = predictions

Create Pipeline

def compute_signals(): signals = MLSignal() return Pipeline(columns={ 'longs' : signals.top(N_LONGS, mask=signals > 0), 'shorts': signals.bottom(N_SHORTS, mask=signals < 0)}, screen=StaticAssets(assets))

Initialize Algorithm

def initialize(context): """ Called once at the start of the algorithm. """ context.longs = context.shorts = None set_slippage(slippage.FixedSlippage(spread=0.00)) # set_commission(commission.PerShare(cost=0.001, min_trade_cost=0)) schedule_function(rebalance, date_rules.every_day(), time_rules.market_open(hours=1, minutes=30)) schedule_function(record_vars, date_rules.every_day(), time_rules.market_close()) pipeline = compute_signals() attach_pipeline(pipeline, 'signals')

Get daily Pipeline results

def before_trading_start(context, data): """ Called every day before market open. """ output = pipeline_output('signals') longs = pipeline_output('signals').longs.astype(int) shorts = pipeline_output('signals').shorts.astype(int) holdings = context.portfolio.positions.keys() if longs.sum() > MIN_POSITIONS and shorts.sum() > MIN_POSITIONS: context.longs = longs[longs!=0].index context.shorts = shorts[shorts!=0].index context.divest = holdings - set(context.longs) - set(context.shorts) else: context.longs = context.shorts = pd.Index([]) context.divest = set(holdings)

Define Rebalancing Logic

def rebalance(context, data): """ Execute orders according to schedule_function() date & time rules. """ for symbol, open_orders in get_open_orders().items(): for open_order in open_orders: cancel_order(open_order) for stock in context.divest: order_target(stock, target=0) # log.warning('{} {:,.0f}'.format(len(context.portfolio.positions), context.portfolio.portfolio_value)) if not (context.longs.empty and context.shorts.empty): for stock in context.shorts: order_target_percent(stock, -1 / len(context.shorts)) for stock in context.longs: order_target_percent(stock, 1 / len(context.longs))

Record Data Points

def record_vars(context, data): """ Plot variables at the end of each day. """ record(leverage=context.account.leverage, longs=context.longs, shorts=context.shorts)

Run Algorithm

dates = predictions.index.get_level_values('date') start_date, end_date = dates.min(), dates.max()
print('Start: {}\nEnd: {}'.format(start_date.date(), end_date.date()))
Start: 2014-11-28 End: 2017-11-29
start = time() results = run_algorithm(start=start_date, end=end_date, initialize=initialize, before_trading_start=before_trading_start, capital_base=1e5, data_frequency='daily', bundle='quandl', custom_loader=signal_loader) # need to modify zipline print('Duration: {:.2f}s'.format(time() - start))
[2020-06-22 14:59:13.911299]: WARNING: _load_cached_data: Refusing to download new benchmark data because a download succeeded at 2020-06-22 14:53:37.126521+00:00.
Duration: 48.13s

PyFolio Analysis

returns, positions, transactions = pf.utils.extract_rets_pos_txn_from_zipline(results)
benchmark = web.DataReader('SP500', 'fred', '2014', '2018').squeeze() benchmark = benchmark.pct_change().tz_localize('UTC')

Custom Plots

LIVE_DATE = '2016-11-30'
fig, axes = plt.subplots(ncols=2, figsize=(16, 5)) plot_rolling_returns(returns, factor_returns=benchmark, live_start_date=LIVE_DATE, logy=False, cone_std=2, legend_loc='best', volatility_match=False, cone_function=forecast_cone_bootstrap, ax=axes[0]) plot_rolling_sharpe(returns, ax=axes[1], rolling_window=63) axes[0].set_title('Cumulative Returns - In and Out-of-Sample') axes[1].set_title('Rolling Sharpe Ratio (3 Months)') sns.despine() fig.tight_layout() fig.savefig((results_path / 'pyfolio_out_of_sample').as_posix(), dpi=300)
Image in a Jupyter notebook

Tear Sheets

pf.create_full_tear_sheet(returns, positions=positions, transactions=transactions, benchmark_rets=benchmark, live_start_date=LIVE_DATE, round_trips=True)
Image in a Jupyter notebookImage in a Jupyter notebookImage in a Jupyter notebookImage in a Jupyter notebookImage in a Jupyter notebook