Path: blob/main/watsonx-data-l4-deploy/SparkLabs/spark-labs-old.ipynb
1928 views
Spark Labs
Introduction
Welcome to the Spark Labs Lab Guide! In this lab, you will walk through a hands-on scenario where you will learn how to leverage Apache Spark to:
Load data efficiently
Save data into a table
Clean and preprocess data using Spark's parallel processing capabilities
Train a simple Logistic Regression classification model on the processed data
Highlighting Spark Labs Benefits:
Before you dive into the lab, take a moment to see the benefits of using Spark Labs with watsonx.data, especially in comparison to using external notebooks that connects to watsonx.data.
No Dependency Management:
External Notebooks: When working in an external notebook, you would typically run your code with the dependencies installed on your own computer. This can lead to version mismatches, dependency conflicts, or other setup challenges.
watsonx.data: Since Spark Labs runs directly within the watsonx.data environment, you don’t need to worry about managing or troubleshooting dependencies. All the necessary packages and configurations are preconfigured on the server, ensuring consistency and saving you time.
Easier Configuration:
External Notebooks: Setting up an external environment often requires manual configuration, from installing the right versions of Spark and libraries to setting up paths and environment variables.
watsonx.data: With watsonx.data, the environment is preconfigured for you. You can jump straight into coding without spending time on setup, making the process much faster and smoother.
Integrated Development Environment:
External Notebooks: Using external notebooks can require managing different tools and platforms to run Spark, track results, and visualize data, which can be disjointed and time-consuming.
watsonx.data: In this environment, everything you need is integrated into a single platform. You can manage your notebooks, run Spark jobs, and view results all in real time, without switching between different tools.
Easy Collaboration:
External Notebooks: Collaboration can be cumbersome when working in local environments. Sharing notebooks may involve version control, syncing, or exporting and sending files manually.
watsonx.data: With notebooks stored directly on the server, collaboration is seamless. You can easily share your work with colleagues and stakeholders, ensuring that everyone is on the same page and able to provide feedback in real-time.
Spark Session Setup
The Traditional Setup Method (External Notebooks)
Before you can start using Apache Spark, you need to set up a Spark Session. This session is crucial as it acts as the entry point for all Spark functionality, allowing you to interact with Spark’s powerful distributed computing capabilities.
In external environments, the setup typically involves configuring several options like the application name, cluster manager, memory allocation, and more. For example:
However, when running within watsonx.data, this extensive configuration is unnecessary because it is automatically setup, streamlining the setup processes, saving time and avoiding the need to sift through documentation.
Simplified Setup for Watsonx.data (Spark Labs)
In this current lab, the only configuration needed is the fs.s3a.path.style.access flag, which should be set to true. This is required because you are working with a MinIO S3-compatible object storage bucket, and this flag ensures that Spark can properly access it.
Additional Useful Flags:
There are a few other configuration options that can help fine-tune your Spark session, especially for logging:
ae.spark.driver.log.level: Sets the log level for the driver (e.g."INFO")ae.spark.executor.log.level: Sets the log level for the executor (e.g."INFO")
These flags are optional but can be helpful for debugging or performance tuning.
To verify that spark session and see the current version of spark, you can run:
Viewing the Database and Creating the Schema
Watsonx.data follows a cascading naming scheme, structured in a hierarchical form: Catalog > Schema > Table. This topology helps organize the data in a way that is intuitive and easy to navigate.
Catalog: The top level in the architecture. Each catalog contains multiple Schemas. It acts as a container for related datasets.Schema: The middle level. A Schema organizes multiple Tables within a catalog. It serves as a namespace for tables, often reflecting logical groupings of data.Table: The bottom level, where actual data resides. Each Table follows a specific schema definition and contains the data, often in a structured format such as rows and columns.
Working with the Hierachy Naming Scheme
In Watsonx.data, the hierarchy of Catalogs, Schemas, and Tables is accessed using dot notation. This makes it easy to specify and query data at various levels of the hierarchy.
For example, if you wanted to run queries on the Table exampleTable within the Schema exampleSchema, which is contained in the Catalog exampleCatalog, you would refer to it using the following syntax:
Viewing Catalogs
To view the available Catalogs in Watsonx.data, you can use the following approach below.
Note that spark_catalog is the default catalog that comes with Spark. This manages the metadata of your session's databases and tables, including those stored in Spark's internal Hive-like catalog or other data sources that Spark supports.
Viewing Schemas within a Catalog
To view the Schemas within a catalog, you can use the following approach:
Creating a New Schema
Before you proceed with data ingestion, you need to create a new Schema where the newly created Table will reside.
In this case, you will create a Schema called demo within the Catalog iceberg_data. Using dot notation, the full reference to this Schema is iceberg_data.demo.
To create the new Schema, run the SQL command below:
Now you will double check to see if the Schema was created correctly:
As you can see there is now a schema called demo in the returned table, so the Schema creation was successful and you can move on to the next step.
Data Ingestion
Now that you have the needed Catalog and Schema setup, you can now move onto ingesting data.
Overview of the Dataset
The dataset you will be using is an ensembled Heart Disease Dataset, which consists of a combination of the 5 popular heart disease datasets:
Cleveland
Hungarian
Switzerland
Long Beach VA
Statlog (Heart) Data Set
The dataset consists of 1190 rows, with 11 features (attributes) and 1 target (column to predict):
| Feature Name | Description | Data Type | Type |
|---|---|---|---|
| age | Age of the patient | int | Numeric |
| sex | Gender of the patient (1 = male, 0 = female) | int | Categorical (binary) |
| chest pain type | Type of chest pain (values: 1, 2, 3, 4) | int | Categorical (ordinal) |
| resting bp s | Resting blood pressure | int | Numeric |
| cholesterol | Serum cholesterol in mg/dl | int | Numeric |
| fasting blood sugar | Fasting blood sugar > 120 mg/dl (1 = true, 0 = false) | int | Categorical (binary) |
| resting ecg | Resting electrocardiographic results (values: 0, 1, 2) | int | Categorical (ordinal) |
| max heart rate | Maximum heart rate | int | Numeric |
| exercise angina | Exercise-induced angina (1 = yes, 0 = no) | int | Categorical (binary) |
| old peak | Depression induced by exercise relative to rest (numeric value) | float | Numeric |
| ST slope | Slope of the peak exercise ST segment (values: 1, 2, 3) | int | Categorical (ordinal) |
| target | Presence or absence of heart disease (1 = disease present, 0 = no disease) | int | Categorical (binary) |
The data is stored in .csv format.
Importing Data from a CSV File
First you will setup the path to the file:
The dataset filename is called: heart_data.csv and therefore you append that to the path and read the file into a dataframe.
You can show the top two rows of the dataframe by running the .show() method:
You can also show the schema of the dataframe by running the .printSchema() method:
One issue you need to address is that the columns in the currently imported dataframe are all of type string, which is not what you want (this is because csv data does not include types, therefore everything is assumed to be a string). You need the data types to match those defined in the table above, so you will have to cast the columns to their appropriate types.
Now you can print out the new transformed dataframe, and you see that the types are now correct:
Creating the Table
To create a table in an Iceberg-managed format, you can use the following SQL statement within Spark. This ensures that the table is created with the correct schema and is managed within your Iceberg catalog.
The following code creates a Table called heart_data in the demo Schema of the iceberg_data Catalog, ensuring that it only gets created if it doesn't already exist.
If the table was created successfully the output will be an empty dataframe, this is because the CREATE commands typically do not return any data.
Confirm that the table was created successfully:
To write the data from the transformed dataframe into the created Iceberg table, you can use the write method in Spark. The following steps outline how to insert the transformed dataframe into the heart_data table created in the Iceberg catalog.
Now lets run a query on the saved Iceberg table within watsonx.data to see if the information was saved successfully:
And voilà! You have successfully ingested data into watsonx.data.
Data Cleaning
Data quality is crucial for the success of AI models, and a major part of machine learning involves cleaning the data. While the dataset contains only around 1,000 rows, in real-world applications, datasets can contain trillions of rows or tokens.
Traditional data processing methods, such as using Hadoop, typically involve reading and writing data to disk. Each read and write operation takes significant computation time, and transformations are applied sequentially.
For example, if you had only 100MB of disk space but your dataset was 1GB (1,000MB), Hadoop would work as follows: it would load 100MB of data into memory, perform transformations, then load the next 100MB, and continue this process until the entire dataset has been processed.
Understanding the Value of Spark
Spark revolutionizes data processing by offering two key advantages:
In-Memory Computation: Unlike Hadoop, which relies on disk storage, Spark loads data directly into memory. This approach is 10x to 100x faster since accessing memory is much quicker than reading from disk. This is especially advantageous for iterative algorithms, such as machine learning model training, which require repeated access to the data.
Parallelism: To maximize efficiency, Spark is designed to process data in parallel across distributed systems (i.e., across many nodes in a cluster). By splitting the dataset into smaller partitions and performing operations on them simultaneously, Spark avoids the bottleneck of sequential processing. This parallel processing allows Spark to handle very large datasets that do not fit into a single machine's memory by distributing the workload across many machines.
By leveraging these capabilities, Spark can process data much faster, saving both time and resources.
Viewing the Data
First you run a query to select the data from the iceberg_data on the server.
The quality of a model is heavily influenced by the data it is trained on. To ensure better performance, you need to carefully examine and clean the dataset.
For instance, a dataset may contain inherent biases. If certain columns are overrepresented, the model could become biased toward those features. Additionally, missing values in the dataset can disrupt the training process and lead to inaccurate predictions or lower model performance.
While there are many aspects to consider when cleaning a dataset, for the purposes of this lab, you will only focus on handling missing values.
In this lab, you will walk through each column of the dataset and check for any missing (null) values, which you will then address appropriately to ensure a cleaner and more reliable dataset for model training.
From the output, you can see that the column with null values is the chest pain type column.
Before you address this, there is another important point about Spark. Up until now, you have mainly been running queries to work with the data. However, Spark also comes with a rich set of built-in data processing methods that are not only simple to use but also highly efficient. These methods can often be more convenient and performant than SQL queries, especially when working with large datasets.
For the task of handling the null values in the chest pain type column, this lab will demonstrate how to approach this in two ways: first using SQL query syntax, and second using Spark's built-in data processing methods. This will give you the flexibility to choose the most suitable approach based on the context.
Cleaning the Data (SQL)
First you will need to see how many null values this column contains:
If its a small number, then you can directly remove those rows
If its a large number, then you can remove the column all togehter
As you can see there are only 5 rows with null values, therefore you will go with the first option and remove those rows all together.
And that is it! Now I will show the method using Sparks built in data processing methods.
Cleaning the Data (Spark Methods)
The steps will be the same, first seeing how many rows there are and how many contain null values:
Then dropping the null values:
As you can see, Spark's built-in methods result in much shorter and cleaner code. These methods leverage the df_server variable, which has already been queried and stored in memory. This makes the approach more efficient, as it is operating on the cached data in memory rather than querying the data on the server repeatedly.
Additionally, Spark provides easy-to-use, built-in functions for common tasks like dropping null values, making the code simpler and more concise. This approach allows you to take full advantage of Spark’s capabilities while keeping the code efficient and readable.
Transforming and Encoding Features
Our classification model will be trained using a single vector that represents the data. The dataset contains two types of data: numerical and categorical.
Categorical data can be further divided into two types: binary and ordinal.
Binary features take values of either
0or1.Ordinal features, on the other hand, represent categories that are ordered (e.g.,
0, 1, ..., x). While these values are numeric, they should not be treated as continuous numbers because they represent distinct categories. Unlike numerical features, where you might identify relationships (e.g., identifying a threshold for resting heart rate indicating potential heart disease), ordinal features should not have relationships inferred between the numbers themselves.
For example, consider the ordinal feature height with values 0, 1, and 2, corresponding to short, average, and tall. It doesn't make sense to infer that the model should treat 0 (short), 1 (average), and 2 (tall) as ordered numbers. Instead, you can represent this data using one-hot encoding, where each category is represented as a binary vector: [1, 0, 0] for short, [0, 1, 0] for average, and [0, 0, 1] for tall. This ensures that the model sees these categories as mutually exclusive, not as a numeric range.
The next step is to convert these ordinal categorical features into one-hot encodings for better training.
From the data representation table given above, you can identify the following features as ordinal categorical features:
Chest pain type
Resting ECG
ST slope
Since the binary features do not require any transformation, you will include them in the list of numerical features. These binary features already take values of 0 or 1, so they don't need further encoding.
You can look at the distribution of these ordinal categorical features by doing the following:
As seen above, the chest pain type feature contains values: 1, 2, 3, and 4. To represent this feature using one-hot encoding, you convert each value into a binary vector. For example, the value 4 will be represented as [0, 0, 0, 1], where each index corresponds to one of the four possible categories.
This way, the model will treat each category as mutually exclusive, avoiding any misleading relationships between the values.
Next you will be using Spark’s OneHotEncoder to transform the categorical features into one-hot encoded vectors. The OneHotEncoder is a part of the pyspark.ml.feature module, which provides useful data transformation methods tailored for machine learning workflows.
It is useful because it:
is optimized for distributed computing.
is a part of Spark’s MLlib (Machine Learning Library), which allows for seamless integration into machine learning pipelines.
is easy to use.
The output of the OneHotEncoder will have the encoded ordinal categorical features with the suffix _OneHotEncoder. These newly created columns represent the one-hot encoded version of each categorical feature, turning them into binary vectors.
Next, you will need to combine all the selected features — both the numerical columns and the one-hot encoded categorical columns — into a single vector column using Spark's VectorAssembler. This step is necessary because machine learning algorithms in Spark require the input features to be in a single vector format.
Splitting the Data for Training and Testing
Now you split the data into training data and testing data with a 80/20 split: 80% training and 20% testing
Creating a Spark Pipeline
Next you wil setup a Spark pipeline to apply a sequence of data transformations.
Pipeline: A pipeline allows you to chain multiple stages of data processing into one object, making it easy to apply the same transformations to both training and test datasets.stages: This list contains the transformation steps that will be applied in order. In this case, it includes the one-hot encoding of categorical features and the assembly of features into a single vector using VectorAssembler.pipeline: The pipeline object is created and the stages are set. When applied, it will automatically execute the transformations in the correct order (first one-hot encoding, then vector assembly).
The benefit of using a pipeline is that it organizes the data processing steps into a streamlined, reusable workflow, ensuring consistency across datasets.
Now, you will fit the pipeline to the training data, learning the necessary transformations, and then apply the fitted pipeline to transform the test set, ensuring it undergoes the same preprocessing as the training data.
Now, you will check if the transformed data, pp_train_df, is correct.
Do not be alarmed by the output format. While you expected one vector with the desired values, some rows may appear as tuples because they are using a sparse vector format. This format helps save memory while still representing the same information.
In the sparse vector format:
Index 0: Represents the length of the vector.Index 1: Lists the indices of the non-zero values.Index 2: Contains the actual non-zero values. This efficient representation allows Spark to handle large datasets more effectively by only storing non-zero values.
Model Training
Preparing data for model training
Before training the model, you need to ensure that the data is in the correct format. Specifically, you need to:
Select the features: Use the transformed features, which are represented by the VectorAssembler_features column.
Rename the target column: The column containing the labels should be named label (as required by many Spark MLlib models like
LogisticRegression).
Training the Model
You will be using Logistic Regression as the classification model. If you are interested in learning more about what Logistic Regression is, I highly recommend checking out this article by IBM: Logistic Regression. However going into detail is out of scope of this lab.
Steps:
Initialize the Model: You start by initializing a Logistic Regression model using LogisticRegression() from PySpark's MLlib.
Fit the Model: Next, you fit the model to the train_data dataset, where the model learns the relationship between the features and the label.
You can see the performance of the model on the train set by running:
As you can see the model performed pretty well, with a 90+ percent accuracy on the training data.
Testing the Model
Now that the model is trained and shows good performance on the training data, the next critical step is to evaluate how well the model generalizes. This means testing whether the model performs well on unseen data (the test set), which gives you an indication of how it will perform in real-world scenarios when dealing with new data.
To evaluate the performance of the model on the test data, you will use the MulticlassClassificationEvaluator from PySpark. This evaluator helps compute various metrics like accuracy, precision, recall, and F1 score. For this example, you will focus on accuracy.
Steps to Evaluate the Model:
Get Predictions from the test_data: Use the trained model to predict on the test data.
Evaluate Accuracy: Use the MulticlassClassificationEvaluator to compute the accuracy by comparing the predicted labels with the actual labels.
And the model performs well on unseen data as well! With that, the lab is concluded. I hope you enjoyed this lab and learned a lot about the advantages of Spark and Spark Lab.
All the best!