Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ethen8181
GitHub Repository: ethen8181/machine-learning
Path: blob/master/big_data/spark_pca.ipynb
1470 views
Kernel: Python 3
# code for loading the format for the notebook import os # path : store the current path to convert back to it later path = os.getcwd() os.chdir(os.path.join('..', 'notebook_format')) from formats import load_style load_style(plot_style = False)
os.chdir(path) # 1. magic for inline plot # 2. magic to print version # 3. magic so that the notebook will reload external python modules # 4. magic to enable retina (high resolution) plots # https://gist.github.com/minrk/3301035 %matplotlib inline %load_ext watermark %load_ext autoreload %autoreload 2 %config InlineBackend.figure_format = 'retina' import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.datasets import load_iris from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA # create the SparkSession class, # which is the entry point into all functionality in Spark # The .master part sets it to run on all cores on local, note # that we should leave out the .master part if we're actually # running the job on a cluster, or else we won't be actually # using the cluster spark = (SparkSession. builder. master('local[*]'). appName('PCA'). config(conf = SparkConf()). getOrCreate()) %watermark -a 'Ethen' -d -t -v -p numpy,pandas,matplotlib,sklearn,pyspark
Ethen 2017-10-07 14:50:59 CPython 3.5.2 IPython 6.1.0 numpy 1.13.3 pandas 0.20.3 matplotlib 2.0.0 sklearn 0.19.0 pyspark 2.2.0

Spark PCA

This is simply an API walkthough, for more details on PCA consider referring to the following documentation.

# load the data and convert it to a pandas DataFrame, # then use that to create the spark DataFrame iris = load_iris() X = iris['data'] y = iris['target'] data = pd.DataFrame(X, columns = iris.feature_names) dataset = spark.createDataFrame(data, iris.feature_names) dataset.show(6)
+-----------------+----------------+-----------------+----------------+ |sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)| +-----------------+----------------+-----------------+----------------+ | 5.1| 3.5| 1.4| 0.2| | 4.9| 3.0| 1.4| 0.2| | 4.7| 3.2| 1.3| 0.2| | 4.6| 3.1| 1.5| 0.2| | 5.0| 3.6| 1.4| 0.2| | 5.4| 3.9| 1.7| 0.4| +-----------------+----------------+-----------------+----------------+ only showing top 6 rows

Next, in order to train ML models in Spark later, we'll use the VectorAssembler to combine a given list of columns into a single vector column.

# specify the input columns' name and # the combined output column's name assembler = VectorAssembler( inputCols = iris.feature_names, outputCol = 'features') # use it to transform the dataset and select just # the output column df = assembler.transform(dataset).select('features') df.show(6)
+-----------------+ | features| +-----------------+ |[5.1,3.5,1.4,0.2]| |[4.9,3.0,1.4,0.2]| |[4.7,3.2,1.3,0.2]| |[4.6,3.1,1.5,0.2]| |[5.0,3.6,1.4,0.2]| |[5.4,3.9,1.7,0.4]| +-----------------+ only showing top 6 rows

Next, we standardize the features, notice here we only need to specify the assembled column as the input feature.

scaler = StandardScaler( inputCol = 'features', outputCol = 'scaledFeatures', withMean = True, withStd = True ).fit(df) # when we transform the dataframe, the old # feature will still remain in it df_scaled = scaler.transform(df) df_scaled.show(6)
+-----------------+--------------------+ | features| scaledFeatures| +-----------------+--------------------+ |[5.1,3.5,1.4,0.2]|[-0.8976738791967...| |[4.9,3.0,1.4,0.2]|[-1.1392004834649...| |[4.7,3.2,1.3,0.2]|[-1.3807270877331...| |[4.6,3.1,1.5,0.2]|[-1.5014903898672...| |[5.0,3.6,1.4,0.2]|[-1.0184371813308...| |[5.4,3.9,1.7,0.4]|[-0.5353839727944...| +-----------------+--------------------+ only showing top 6 rows

After the preprocessing step, we fit the PCA model.

n_components = 2 pca = PCA( k = n_components, inputCol = 'scaledFeatures', outputCol = 'pcaFeatures' ).fit(df_scaled) df_pca = pca.transform(df_scaled) print('Explained Variance Ratio', pca.explainedVariance.toArray()) df_pca.show(6)
Explained Variance Ratio [ 0.72770452 0.23030523] +-----------------+--------------------+--------------------+ | features| scaledFeatures| pcaFeatures| +-----------------+--------------------+--------------------+ |[5.1,3.5,1.4,0.2]|[-0.8976738791967...|[2.25698063306802...| |[4.9,3.0,1.4,0.2]|[-1.1392004834649...|[2.07945911889540...| |[4.7,3.2,1.3,0.2]|[-1.3807270877331...|[2.36004408158420...| |[4.6,3.1,1.5,0.2]|[-1.5014903898672...|[2.29650366000388...| |[5.0,3.6,1.4,0.2]|[-1.0184371813308...|[2.38080158645274...| |[5.4,3.9,1.7,0.4]|[-0.5353839727944...|[2.06362347633723...| +-----------------+--------------------+--------------------+ only showing top 6 rows

Notice that unlike scikit-learn, we use transform on the dataframe at hand for all ML models' class after fitting it (calling .fit on the dataframe). This will return the result in a new column, where the name is specified by the outputCol argument in the ML models' class.

We can convert it back to a numpy array by extracting the pcaFeatures column from each row, and use collect to bring the entire dataset back to a single machine.

# not sure if this is the best way to do it X_pca = df_pca.rdd.map(lambda row: row.pcaFeatures).collect() X_pca = np.array(X_pca)
# change default style figure and font size plt.style.use('fivethirtyeight') plt.rcParams['figure.figsize'] = 8, 6 plt.rcParams['font.size'] = 12 def plot_iris_pca(X_pca, y): """a scatter plot of the 2-dimensional iris data""" markers = 's', 'x', 'o' colors = list(plt.rcParams['axes.prop_cycle']) target = np.unique(y) for idx, (t, m) in enumerate(zip(target, markers)): subset = X_pca[y == t] plt.scatter(subset[:, 0], subset[:, 1], s = 50, c = colors[idx]['color'], label = t, marker = m) plt.xlabel('PC 1') plt.ylabel('PC 2') plt.legend(loc = 'lower left') plt.tight_layout() plt.show()
plot_iris_pca(X_pca, y)
Image in a Jupyter notebook
# stop the current sparkSession spark.stop()