Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
jupyter-naas
GitHub Repository: jupyter-naas/awesome-notebooks
Path: blob/master/Dask/Dask_parallelize_operations_on_multiple_csvs.ipynb
2973 views
Kernel: Python 3

Dask.png

Dask - Parallelize operations on multiple csvs

Give Feedback | Bug report

Tags: #csv #pandas #snippet #read #dataframe #parallel #parallelize #dask #operations

Last update: 2023-04-12 (Created: 2022-04-13)

Description: This notebook demonstrates how to use Dask to efficiently process and analyze multiple CSV files in parallel.

Input

Imports

import os

Import Graphviz (install if not present)

try: import graphviz except: !pip install --user graphviz import graphviz

Import Dask (install if not present)

try: import dask.dataframe as dd except: !python -m pip install "dask[complete]" import dask.dataframe as dd

Variable

folder_path = "nycflights" %env FOLDER_PATH=$folder_path

Download dataset if it does not exists

%%bash [[ -f "$FOLDER_PATH/nycflights.csv" ]] || (mkdir -p $FOLDER_PATH && wget -O $FOLDER_PATH/nycflights.csv https://github.com/vaibhavwalvekar/NYC-Flights-2013-Dataset-Analysis/raw/master/flights.csv )

Model

Read the CSV files from path

# when the actual data types of given columns cannot be inferred from the first few examples # they need to be specified manually # this is where the dtype parameters comes in df = dd.read_csv( os.path.join(folder_path, "*.csv"), parse_dates={"Date": [0, 1, 2]}, dtype={ "TailNum": str, "CRSElapsedTime": float, "Cancelled": bool, "dep_delay": float, }, )

Output

Calculate the max of a column

# no operation is actually performed until the .compute() function is called df["dep_delay"].max().compute()

Visualize the parallel execution of the operation

# the underlying task graph can be viewed to understand how the parallel execution takes place df.dep_delay.max().visualize(rankdir="LR", size="12, 12!")

Comparison

Pandas

import pandas as pd import glob
%%time # the equivalent operation performed using Pandas all_files = glob.glob(os.path.join(folder_path,'*.csv')) dfs = [] for file in all_files: dfs.append(pd.read_csv(file, parse_dates={'Date': [0, 1, 2]})) df = pd.concat(dfs, axis=0) df.dep_delay.max()

Dask

%%time # the entire operation again performed using Dask df = dd.read_csv(os.path.join(folder_path,'*.csv'), parse_dates={'Date': [0, 1, 2]}, dtype={'TailNum': str, 'CRSElapsedTime': float, 'Cancelled': bool, 'dep_delay': float}) df['dep_delay'].max().compute() # Dask clearly performs better in comparison to Pandas # the performance benefits are more apparent when working on larger datasets # especially when the size of the data exceeds available memory