Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
CloudPak-Outcomes
GitHub Repository: CloudPak-Outcomes/Outcomes-Projects
Path: blob/main/watsonx-data-l4-deploy/SparkLabs/spark-labs.ipynb
1928 views
Kernel: python

Spark Labs

Introduction

Welcome to the Spark Labs Lab Guide! In this lab, you will walk through a hands-on scenario where you will learn how to leverage Apache Spark to:

  • Load data efficiently

  • Save data into a table

  • Clean and preprocess data using Spark's parallel processing capabilities

  • Train a simple Logistic Regression classification model on the processed data

Highlighting Spark Labs Benefits:

Before you dive into the lab, take a moment to see the benefits of using Spark Labs with watsonx.data, especially in comparison to using external notebooks that connects to watsonx.data.

  1. No Dependency Management:

    • External Notebooks: When working in an external notebook, you would typically run your code with the dependencies installed on your own computer. This can lead to version mismatches, dependency conflicts, or other setup challenges.

    • watsonx.data: Since Spark Labs runs directly within the watsonx.data environment, you don’t need to worry about managing or troubleshooting dependencies. All the necessary packages and configurations are preconfigured on the server, ensuring consistency and saving you time.

  2. Easier Configuration:

    • External Notebooks: Setting up an external environment often requires manual configuration, from installing the right versions of Spark and libraries to setting up paths and environment variables.

    • watsonx.data: With watsonx.data, the environment is preconfigured for you. You can jump straight into coding without spending time on setup, making the process much faster and smoother.

  3. Integrated Development Environment:

    • External Notebooks: Using external notebooks can require managing different tools and platforms to run Spark, track results, and visualize data, which can be disjointed and time-consuming.

    • watsonx.data: In this environment, everything you need is integrated into a single platform. You can manage your notebooks, run Spark jobs, and view results all in real time, without switching between different tools.

  4. Easy Collaboration:

    • External Notebooks: Collaboration can be cumbersome when working in local environments. Sharing notebooks may involve version control, syncing, or exporting and sending files manually.

    • watsonx.data: With notebooks stored directly on the server, collaboration is seamless. You can easily share your work with colleagues and stakeholders, ensuring that everyone is on the same page and able to provide feedback in real-time.

Spark Session Setup

The Traditional Setup Method (External Notebooks)

Before you can start using Apache Spark, you need to set up a Spark Session. This session is crucial as it acts as the entry point for all Spark functionality, allowing you to interact with Spark’s powerful distributed computing capabilities.

In external environments, the setup typically involves configuring several options like the application name, cluster manager, memory allocation, and more. For example:

conf = SparkConf() \ .setAppName("ComplexSparkSessionSetup") \ .setMaster("yarn") \ # Using YARN as the cluster manager .set("spark.sql.warehouse.dir", "/user/hive/warehouse") \ # Custom warehouse location .set("spark.sql.shuffle.partitions", "500") \ # Increase shuffle partitions for performance .set("spark.executor.memory", "8g") \ # Allocate 8GB of memory to each executor .set("spark.driver.memory", "4g") \ # Allocate 4GB of memory to the driver .set("spark.executor.cores", "4") \ # 4 cores per executor .set("spark.sql.parquet.compression.codec", "snappy") \ # Use Snappy compression for Parquet files .set("spark.ui.port", "4041") # Change the Spark UI port to avoid conflicts

However, when running within watsonx.data, this extensive configuration is unnecessary because it is automatically setup, streamlining the setup processes, saving time and avoiding the need to sift through documentation.

Simplified Setup for Watsonx.data (Spark Labs)

In this current lab, the only configuration needed is the fs.s3a.path.style.access flag, which should be set to true. This is required because you are working with a MinIO S3-compatible object storage bucket, and this flag ensures that Spark can properly access it.

Additional Useful Flags:

There are a few other configuration options that can help fine-tune your Spark session, especially for logging:

  • ae.spark.driver.log.level: Sets the log level for the driver (e.g. "INFO")

  • ae.spark.executor.log.level: Sets the log level for the executor (e.g. "INFO")

These flags are optional but can be helpful for debugging or performance tuning.

from pyspark.sql import SparkSession import warnings warnings.filterwarnings('ignore') spark = SparkSession.builder.appName('sparky').getOrCreate() conf=spark.sparkContext.getConf() spark.stop() conf.setAll([ ("fs.s3a.path.style.access", "true"), # ("ae.spark.driver.log.level", "INFO"), # ("ae.spark.executor.log.level", "INFO"), ]) spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

To verify that spark session and see the current version of spark, you can run:

spark.version

Viewing the Database and Creating the Schema

Watsonx.data follows a cascading naming scheme, structured in a hierarchical form: Catalog > Schema > Table. This topology helps organize the data in a way that is intuitive and easy to navigate.

  • Catalog: The top level in the architecture. Each catalog contains multiple Schemas. It acts as a container for related datasets.

  • Schema: The middle level. A Schema organizes multiple Tables within a catalog. It serves as a namespace for tables, often reflecting logical groupings of data.

  • Table: The bottom level, where actual data resides. Each Table follows a specific schema definition and contains the data, often in a structured format such as rows and columns.

Working with the Hierachy Naming Scheme

In Watsonx.data, the hierarchy of Catalogs, Schemas, and Tables is accessed using dot notation. This makes it easy to specify and query data at various levels of the hierarchy.

For example, if you wanted to run queries on the Table exampleTable within the Schema exampleSchema, which is contained in the Catalog exampleCatalog, you would refer to it using the following syntax:

exampleCatalog.exampleSchema.exampleTable

Viewing Catalogs

To view the available Catalogs in Watsonx.data, you can use the following approach below.

Note that spark_catalog is the default catalog that comes with Spark. This manages the metadata of your session's databases and tables, including those stored in Spark's internal Hive-like catalog or other data sources that Spark supports.

spark.sql(""" SHOW CATALOGS """).show() # Note: the .show() method displays outputs in a human readable table format as seen below

Viewing Schemas within a Catalog

To view the Schemas within a catalog, you can use the following approach:

spark.sql(""" SHOW SCHEMAS IN iceberg_data """).show()

Creating a New Schema

Before you proceed with data ingestion, you need to create a new Schema where the newly created Table will reside.

In this case, you will create a Schema called demo within the Catalog iceberg_data. Using dot notation, the full reference to this Schema is iceberg_data.demo.

To create the new Schema, run the SQL command below:

result = spark.sql(""" CREATE SCHEMA IF NOT EXISTS iceberg_data.demo """) result.show()

Now you will double check to see if the Schema was created correctly:

spark.sql(""" SHOW SCHEMAS IN iceberg_data """).show()

As you can see there is now a schema called demo in the returned table, so the Schema creation was successful and you can move on to the next step.

Data Ingestion

Now that you have the needed Catalog and Schema setup, you can now move onto ingesting data.

Overview of the Dataset

The dataset you will be using is an ensembled Heart Disease Dataset, which consists of a combination of the 5 popular heart disease datasets:

  • Cleveland

  • Hungarian

  • Switzerland

  • Long Beach VA

  • Statlog (Heart) Data Set

The dataset consists of 1190 rows, with 11 features (attributes) and 1 target (column to predict):

Feature NameDescriptionData TypeType
ageAge of the patientintNumeric
sexGender of the patient (1 = male, 0 = female)intCategorical (binary)
chest pain typeType of chest pain (values: 1, 2, 3, 4)intCategorical (ordinal)
resting bp sResting blood pressureintNumeric
cholesterolSerum cholesterol in mg/dlintNumeric
fasting blood sugarFasting blood sugar > 120 mg/dl (1 = true, 0 = false)intCategorical (binary)
resting ecgResting electrocardiographic results (values: 0, 1, 2)intCategorical (ordinal)
max heart rateMaximum heart rateintNumeric
exercise anginaExercise-induced angina (1 = yes, 0 = no)intCategorical (binary)
old peakDepression induced by exercise relative to rest (numeric value)floatNumeric
ST slopeSlope of the peak exercise ST segment (values: 1, 2, 3)intCategorical (ordinal)
targetPresence or absence of heart disease (1 = disease present, 0 = no disease)intCategorical (binary)

The data is stored in .csv format.

Importing Data from a CSV File

First you will setup the path to the file:

import os loc = os.path.abspath('') data_loc = f"{loc}/heart_data.csv"

The dataset filename is called: heart_data.csv and therefore you append that to the path and read the file into a dataframe.

# Load Data df = spark.read.csv(f'{data_loc}', header=True)

You can show the top two rows of the dataframe by running the .show() method:

df.show(2)

You can also show the schema of the dataframe by running the .printSchema() method:

df.printSchema()

One issue you need to address is that the columns in the currently imported dataframe are all of type string, which is not what you want (this is because csv data does not include types, therefore everything is assumed to be a string). You need the data types to match those defined in the table above, so you will have to cast the columns to their appropriate types.

from pyspark.sql.functions import col # Convert the columns to the appropriate data types df_transformed = df.withColumn("age", col("age").cast("int")) \ .withColumn("sex", col("sex").cast("int")) \ .withColumn("chest pain type", col("chest pain type").cast("int")) \ .withColumn("resting bp s", col("resting bp s").cast("int")) \ .withColumn("cholesterol", col("cholesterol").cast("int")) \ .withColumn("fasting blood sugar", col("fasting blood sugar").cast("int")) \ .withColumn("resting ecg", col("resting ecg").cast("int")) \ .withColumn("max heart rate", col("max heart rate").cast("int")) \ .withColumn("exercise angina", col("exercise angina").cast("int")) \ .withColumn("oldpeak", col("oldpeak").cast("float")) \ .withColumn("target", col("target").cast("int")) \ .withColumn("ST slope", col("ST slope").cast("int"))

Now you can print out the new transformed dataframe, and you see that the types are now correct:

df_transformed.printSchema()

Creating the Table

To create a table in an Iceberg-managed format, you can use the following SQL statement within Spark. This ensures that the table is created with the correct schema and is managed within your Iceberg catalog.

The following code creates a Table called heart_data in the demo Schema of the iceberg_data Catalog, ensuring that it only gets created if it doesn't already exist.

If the table was created successfully the output will be an empty dataframe, this is because the CREATE commands typically do not return any data.

spark.sql(""" CREATE TABLE IF NOT EXISTS iceberg_data.demo.heart_data ( age INT, sex INT, `chest pain type` INT, `resting bp s` INT, cholesterol INT, `fasting blood sugar` INT, `resting ecg` INT, `max heart rate` INT, `exercise angina` INT, oldpeak FLOAT, `ST slope` INT, target INT ) """).show()

Confirm that the table was created successfully:

spark.sql(""" SHOW TABLES IN iceberg_data.demo """).show()

To write the data from the transformed dataframe into the created Iceberg table, you can use the write method in Spark. The following steps outline how to insert the transformed dataframe into the heart_data table created in the Iceberg catalog.

df_transformed.write \ .format("iceberg") \ .mode("append") \ .save("iceberg_data.demo.heart_data")

Now lets run a query on the saved Iceberg table within watsonx.data to see if the information was saved successfully:

spark.sql(""" SELECT * FROM iceberg_data.demo.heart_data LIMIT 2 """).show()

And voilà! You have successfully ingested data into watsonx.data.

Data Cleaning

Data quality is crucial for the success of AI models, and a major part of machine learning involves cleaning the data. While the dataset contains only around 1,000 rows, in real-world applications, datasets can contain trillions of rows or tokens.

Traditional data processing methods, such as using Hadoop, typically involve reading and writing data to disk. Each read and write operation takes significant computation time, and transformations are applied sequentially.

For example, if you had only 100MB of disk space but your dataset was 1GB (1,000MB), Hadoop would work as follows: it would load 100MB of data into memory, perform transformations, then load the next 100MB, and continue this process until the entire dataset has been processed.

Understanding the Value of Spark

Spark revolutionizes data processing by offering two key advantages:

  • In-Memory Computation: Unlike Hadoop, which relies on disk storage, Spark loads data directly into memory. This approach is 10x to 100x faster since accessing memory is much quicker than reading from disk. This is especially advantageous for iterative algorithms, such as machine learning model training, which require repeated access to the data.

  • Parallelism: To maximize efficiency, Spark is designed to process data in parallel across distributed systems (i.e., across many nodes in a cluster). By splitting the dataset into smaller partitions and performing operations on them simultaneously, Spark avoids the bottleneck of sequential processing. This parallel processing allows Spark to handle very large datasets that do not fit into a single machine's memory by distributing the workload across many machines.

By leveraging these capabilities, Spark can process data much faster, saving both time and resources.

Viewing the Data

First you run a query to select the data from the iceberg_data on the server.

df_server = spark.sql(""" SELECT * FROM iceberg_data.demo.heart_data """)

The quality of a model is heavily influenced by the data it is trained on. To ensure better performance, you need to carefully examine and clean the dataset.

For instance, a dataset may contain inherent biases. If certain columns are overrepresented, the model could become biased toward those features. Additionally, missing values in the dataset can disrupt the training process and lead to inaccurate predictions or lower model performance.

While there are many aspects to consider when cleaning a dataset, for the purposes of this lab, you will only focus on handling missing values.

In this lab, you will walk through each column of the dataset and check for any missing (null) values, which you will then address appropriately to ensure a cleaner and more reliable dataset for model training.

from pyspark.sql.functions import col # List of columns with null values null_columns = [c for c in df_server.columns if df.filter(col(c).isNull()).count() > 0] print("Columns with null values:", null_columns)

From the output, you can see that the column with null values is the chest pain type column.

Before you address this, there is another important point about Spark. Up until now, you have mainly been running queries to work with the data. However, Spark also comes with a rich set of built-in data processing methods that are not only simple to use but also highly efficient. These methods can often be more convenient and performant than SQL queries, especially when working with large datasets.

For the task of handling the null values in the chest pain type column, this lab will demonstrate how to approach this in two ways: first using SQL query syntax, and second using Spark's built-in data processing methods. This will give you the flexibility to choose the most suitable approach based on the context.

Cleaning the Data (SQL)

First you will need to see how many null values this column contains:

  • If its a small number, then you can directly remove those rows

  • If its a large number, then you can remove the column all togehter

print("Total Samples:") spark.sql("SELECT count(*) AS count FROM iceberg_data.demo.heart_data").show() print("Null Samples:") spark.sql("SELECT count(*) AS count FROM iceberg_data.demo.heart_data AS t WHERE t.`chest pain type` IS NULL").show()

As you can see there are only 5 rows with null values, therefore you will go with the first option and remove those rows all together.

df_server_cleaned = spark.sql(""" SELECT * FROM iceberg_data.demo.heart_data AS t WHERE t.`chest pain type` IS NOT NULL """)

And that is it! Now I will show the method using Sparks built in data processing methods.

Cleaning the Data (Spark Methods)

The steps will be the same, first seeing how many rows there are and how many contain null values:

print("Total Samples:") print(df_server.count()) print("Null Samples:") print(df_server.filter(df_server["`chest pain type`"].isNull()).count())

Then dropping the null values:

df_server = df_server.dropna()

As you can see, Spark's built-in methods result in much shorter and cleaner code. These methods leverage the df_server variable, which has already been queried and stored in memory. This makes the approach more efficient, as it is operating on the cached data in memory rather than querying the data on the server repeatedly.

Additionally, Spark provides easy-to-use, built-in functions for common tasks like dropping null values, making the code simpler and more concise. This approach allows you to take full advantage of Spark’s capabilities while keeping the code efficient and readable.

print(f"Current Length: {df_server.count()}") print(f"Current Null Samples: {df_server.filter(df_server['`chest pain type`'].isNull()).count()}")

Transforming and Encoding Features

Our classification model will be trained using a single vector that represents the data. The dataset contains two types of data: numerical and categorical.

Categorical data can be further divided into two types: binary and ordinal.

  • Binary features take values of either 0 or 1.

  • Ordinal features, on the other hand, represent categories that are ordered (e.g., 0, 1, ..., x). While these values are numeric, they should not be treated as continuous numbers because they represent distinct categories. Unlike numerical features, where you might identify relationships (e.g., identifying a threshold for resting heart rate indicating potential heart disease), ordinal features should not have relationships inferred between the numbers themselves.

For example, consider the ordinal feature height with values 0, 1, and 2, corresponding to short, average, and tall. It doesn't make sense to infer that the model should treat 0 (short), 1 (average), and 2 (tall) as ordered numbers. Instead, you can represent this data using one-hot encoding, where each category is represented as a binary vector: [1, 0, 0] for short, [0, 1, 0] for average, and [0, 0, 1] for tall. This ensures that the model sees these categories as mutually exclusive, not as a numeric range.

The next step is to convert these ordinal categorical features into one-hot encodings for better training.

From the data representation table given above, you can identify the following features as ordinal categorical features:

  • Chest pain type

  • Resting ECG

  • ST slope

Since the binary features do not require any transformation, you will include them in the list of numerical features. These binary features already take values of 0 or 1, so they don't need further encoding.

cat_cols = ["chest pain type", "resting ecg", "ST slope"] num_cols = [x for x in df_server.columns if (x not in cat_cols) & (x != "target")] print(f"Categorical Features: {cat_cols}") print(f"Numerical Features: {num_cols}")

You can look at the distribution of these ordinal categorical features by doing the following:

for feat in cat_cols: print(f"Feature: {feat}") df_server.groupBy(feat).count().show()

As seen above, the chest pain type feature contains values: 1, 2, 3, and 4. To represent this feature using one-hot encoding, you convert each value into a binary vector. For example, the value 4 will be represented as [0, 0, 0, 1], where each index corresponds to one of the four possible categories.

This way, the model will treat each category as mutually exclusive, avoiding any misleading relationships between the values.

Next you will be using Spark’s OneHotEncoder to transform the categorical features into one-hot encoded vectors. The OneHotEncoder is a part of the pyspark.ml.feature module, which provides useful data transformation methods tailored for machine learning workflows.

It is useful because it:

  • is optimized for distributed computing.

  • is a part of Spark’s MLlib (Machine Learning Library), which allows for seamless integration into machine learning pipelines.

  • is easy to use.

from pyspark.ml.feature import OneHotEncoder one_hot_encoder = [ OneHotEncoder(inputCols=cat_cols, outputCols=[f"{x}_OneHotEncoder" for x in cat_cols]) ]

The output of the OneHotEncoder will have the encoded ordinal categorical features with the suffix _OneHotEncoder. These newly created columns represent the one-hot encoded version of each categorical feature, turning them into binary vectors.

Next, you will need to combine all the selected features — both the numerical columns and the one-hot encoded categorical columns — into a single vector column using Spark's VectorAssembler. This step is necessary because machine learning algorithms in Spark require the input features to be in a single vector format.

from pyspark.ml.feature import VectorAssembler assemblerInput = [x for x in num_cols] assemblerInput += [f"{x}_OneHotEncoder" for x in cat_cols] print(f"Assembler inputs are: \n{assemblerInput}") vector_assembler = VectorAssembler( inputCols=assemblerInput, outputCol="VectorAssembler_features" )

Splitting the Data for Training and Testing

Now you split the data into training data and testing data with a 80/20 split: 80% training and 20% testing

train_df, test_df = df_server.randomSplit([0.8, 0.2], seed=1234) print(f"Train data length: {train_df.count()}") print(f"Test data length: {test_df.count()}")

Creating a Spark Pipeline

Next you wil setup a Spark pipeline to apply a sequence of data transformations.

  • Pipeline: A pipeline allows you to chain multiple stages of data processing into one object, making it easy to apply the same transformations to both training and test datasets.

  • stages: This list contains the transformation steps that will be applied in order. In this case, it includes the one-hot encoding of categorical features and the assembly of features into a single vector using VectorAssembler.

  • pipeline: The pipeline object is created and the stages are set. When applied, it will automatically execute the transformations in the correct order (first one-hot encoding, then vector assembly).

The benefit of using a pipeline is that it organizes the data processing steps into a streamlined, reusable workflow, ensuring consistency across datasets.

from pyspark.ml import Pipeline stages = one_hot_encoder + [vector_assembler] pipeline = Pipeline().setStages(stages)

Now, you will fit the pipeline to the training data, learning the necessary transformations, and then apply the fitted pipeline to transform the test set, ensuring it undergoes the same preprocessing as the training data.

%%time fitted_pipeline = pipeline.fit(train_df) pp_train_df = fitted_pipeline.transform(train_df) pp_test_df = fitted_pipeline.transform(test_df)

Now, you will check if the transformed data, pp_train_df, is correct.

Do not be alarmed by the output format. While you expected one vector with the desired values, some rows may appear as tuples because they are using a sparse vector format. This format helps save memory while still representing the same information.

In the sparse vector format:

  • Index 0: Represents the length of the vector.

  • Index 1: Lists the indices of the non-zero values.

  • Index 2: Contains the actual non-zero values. This efficient representation allows Spark to handle large datasets more effectively by only storing non-zero values.

pp_train_df.select( 'VectorAssembler_features' ).show(truncate=False)

Model Training

Preparing data for model training

Before training the model, you need to ensure that the data is in the correct format. Specifically, you need to:

  • Select the features: Use the transformed features, which are represented by the VectorAssembler_features column.

  • Rename the target column: The column containing the labels should be named label (as required by many Spark MLlib models like LogisticRegression).

from pyspark.sql import functions as F train_data = pp_train_df.select(F.col("VectorAssembler_features").alias("features"), F.col("target").alias("label")) test_data = pp_test_df.select(F.col("VectorAssembler_features").alias("features"), F.col("target").alias("label"))

Training the Model

You will be using Logistic Regression as the classification model. If you are interested in learning more about what Logistic Regression is, I highly recommend checking out this article by IBM: Logistic Regression. However going into detail is out of scope of this lab.

Steps:

  1. Initialize the Model: You start by initializing a Logistic Regression model using LogisticRegression() from PySpark's MLlib.

  2. Fit the Model: Next, you fit the model to the train_data dataset, where the model learns the relationship between the features and the label.

%%time from pyspark.ml.classification import LogisticRegression model = LogisticRegression().fit(train_data) print(model)

You can see the performance of the model on the train set by running:

model.summary.areaUnderROC

As you can see the model performed pretty well, with a 90+ percent accuracy on the training data.

Testing the Model

Now that the model is trained and shows good performance on the training data, the next critical step is to evaluate how well the model generalizes. This means testing whether the model performs well on unseen data (the test set), which gives you an indication of how it will perform in real-world scenarios when dealing with new data.

To evaluate the performance of the model on the test data, you will use the MulticlassClassificationEvaluator from PySpark. This evaluator helps compute various metrics like accuracy, precision, recall, and F1 score. For this example, you will focus on accuracy.

Steps to Evaluate the Model:

  1. Get Predictions from the test_data: Use the trained model to predict on the test data.

  2. Evaluate Accuracy: Use the MulticlassClassificationEvaluator to compute the accuracy by comparing the predicted labels with the actual labels.

from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Make predictions on the test data predictions = model.transform(test_data) # Use MulticlassClassificationEvaluator to calculate accuracy evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") # Compute accuracy accuracy = evaluator.evaluate(predictions) print(f"Accuracy on test data: {accuracy:.4f}")

And the model performs well on unseen data as well! With that, the lab is concluded. I hope you enjoyed this lab and learned a lot about the advantages of Spark and Spark Lab.

All the best!