Skip to article frontmatterSkip to article content

Dask DataFrame

Dask logo

Dask DataFrame

In this tutorial, you learn:

  • Basic concepts and features of Dask DataFrames
  • Applications of Dask DataFrames
  • Interacting with Dask DataFrames
  • Built-in operations with Dask DataFrames
  • Dask DataFrames Best Practices

Prerequisites

ConceptsImportanceNotes
Familiarity with Pandas DataFrameNecessary
Dask OverviewNecessary
  • Time to learn: 40 minutes

Introduction

Dask DataFrame is composed of pandas DataFrames

Image credit: Dask Contributors

pandas is a very popular tool for working with tabular datasets, but the dataset needs to fit into the memory.

pandas operates best with smaller datasets, and if you have a large dataset, you’ll receive an out of memory error using pandas. A general rule of thumb for pandas is:

“Have 5 to 10 times as much RAM as the size of your dataset”
Wes McKinney (2017) in 10 things I hate about pandas

But Dask DataFrame can be used to solve pandas performance issues with larger-than-memory datasets.

What is Dask DataFrame?

  • A Dask DataFrame is a parallel DataFrame composed of smaller pandas DataFrames (also known as partitions).

  • Dask Dataframes look and feel like the pandas DataFrames on the surface.

  • Dask DataFrames partition the data into manageable partitions that can be processed in parallel and across multiple cores or computers.

  • Similar to Dask Arrays, Dask DataFrames are lazy!

    Unlike pandas, operations on Dask DataFrames are not computed until you explicitly request them (e.g. by calling .compute).

When to use Dask DataFrame and when to avoid it?

Dask DataFrames are used in situations where pandas fails or has poor performance due to data size.

Dask DataFrame is a good choice when doing parallalizeable computations.
Some examples are:

  • Element-wise operations such as df.x + df.y
  • Row-wise filtering such as df[df.x>0]
  • Common aggregations such as df.x.max()
  • Dropping duplicates such as df.x.drop_duplicate()

However, Dask is not great for operations that requires shuffling or re-indexing.
Some examples are:

  • Set index: df.set_index(df.x)
WARNING: Although, Dask DataFrame has a very similar interface to the pandas DataFrame (as we will see in this tutorial), it does NOT include some of the pandas interface yet.

See the Dask DataFrame API documentation for a compehnsive list of available functions.


Tutorial Dataset

In this tutorial, we are going to use the NOAA Global Historical Climatology Network Daily (GHCN-D) dataset.
GHCN-D is a public available dataset that includes daily climate records from +100,000 surface observations around the world.
This is an example of a real dataset that is used by NCAR scientists for their research. GHCN-D raw dataset for all stations is available through NOAA Climate Data Online.

To learn more about GHCNd dataset, please visit:

Download the data

For this example, we are going to look through a subset of data from the GHCN-D dataset.

First, we look at the daily observations from Denver International Airport, next we are going to look through selected stations in the US.

The access the preprocessed dataset for this tutorial, please run the following script:

!./get_data.sh
Downloading https://docs.google.com/uc?export=download&id=14doSRn8hT14QYtjZz28GKv14JgdIsbFF
tar (child): data.tar.gz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now
rm: cannot remove 'data.tar.gz': No such file or directory
Downloading https://docs.google.com/uc?export=download&id=15rCwQUxxpH6angDhpXzlvbe1nGetYHrf
USC00057936.csv
USC00058204.csv
USC00058429.csv
USC00059243.csv
USC00068138.csv
USC00080211.csv
USC00084731.csv
USC00088824.csv
USC00098703.csv
USC00100010.csv
USC00100470.csv
USC00105275.csv
USC00106152.csv
USC00107264.csv
USC00108137.csv
USC00110338.csv
USC00112140.csv
USC00112193.csv
USC00112348.csv
USC00112483.csv
USC00113335.csv
USC00114108.csv
USC00114442.csv
USC00114823.csv
USC00115079.csv
USC00115326.csv
USC00115712.csv
USC00115768.csv
USC00115833.csv
USC00115901.csv
USC00115943.csv
USC00116446.csv
USW00003017.csv
Downloading https://docs.google.com/uc?export=download&id=1Tbuom1KMCwHjy7-eexEQcOXSr51i6mae

gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now

This script should save the preprocessed GHCN-D data in ../data path.


Pandas DataFrame Basics

Let’s start with an example using pandas DataFrame.

First, let’s read in the comma-seperated GHCN-D dataset for one station at Denver International Airport (DIA), CO (site ID : USW00003017).

To see the list of all available GHCN-D sites and their coordinates and IDs, please see this link.

import os
import pandas as pd

# DIA ghcnd id
site = 'USW00003017'
data_dir = '../data/'


df = pd.read_csv(os.path.join(data_dir, site+'.csv'), parse_dates=['DATE'], index_col=0)
# Display the top five rows of the dataframe
df.head()
Loading...

Question: What variables are available?

df.columns
Index(['ID', 'YEAR', 'MONTH', 'DAY', 'TMAX', 'TMAX_FLAGS', 'TMIN', 'TMIN_FLAGS', 'PRCP', 'PRCP_FLAGS', 'TAVG', 'TAVG_FLAGS', 'SNOW', 'SNOW_FLAGS', 'SNWD', 'SNWD_FLAGS', 'AWND', 'AWND_FLAGS', 'FMTM', 'FMTM_FLAGS', 'PGTM', 'PGTM_FLAGS', 'WDF2', 'WDF2_FLAGS', 'WDF5', 'WDF5_FLAGS', 'WSF2', 'WSF2_FLAGS', 'WSF5', 'WSF5_FLAGS', 'WT01', 'WT01_FLAGS', 'WT02', 'WT02_FLAGS', 'WT08', 'WT08_FLAGS', 'WT16', 'WT16_FLAGS', 'WT17', 'WT17_FLAGS', 'WT18', 'WT18_FLAGS', 'WT03', 'WT03_FLAGS', 'WT05', 'WT05_FLAGS', 'WT19', 'WT19_FLAGS', 'WT10', 'WT10_FLAGS', 'WT09', 'WT09_FLAGS', 'WT06', 'WT06_FLAGS', 'WT07', 'WT07_FLAGS', 'WT11', 'WT11_FLAGS', 'WT13', 'WT13_FLAGS', 'WT21', 'WT21_FLAGS', 'WT14', 'WT14_FLAGS', 'WT15', 'WT15_FLAGS', 'WT22', 'WT22_FLAGS', 'WT04', 'WT04_FLAGS', 'WV03', 'WV03_FLAGS', 'TSUN', 'TSUN_FLAGS', 'WV01', 'WV01_FLAGS', 'WESD', 'WESD_FLAGS', 'ADPT', 'ADPT_FLAGS', 'ASLP', 'ASLP_FLAGS', 'ASTP', 'ASTP_FLAGS', 'AWBT', 'AWBT_FLAGS', 'RHAV', 'RHAV_FLAGS', 'RHMN', 'RHMN_FLAGS', 'RHMX', 'RHMX_FLAGS', 'PSUN', 'PSUN_FLAGS', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'STATE', 'STATION'], dtype='object')

The description and units of the dataset is available here.

Operations on pandas DataFrame

pandas DataFrames has several features that give us flexibility to do different calculations and analysis on our dataset. Let’s check some out:

Simple Analysis

For example:

  • When was the coldest day at this station during December of last year?
# use python slicing notation inside .loc 
# use idxmin() to find the index of minimum valus
df.loc['2022-12-01':'2022-12-31'].TMIN.idxmin()
Timestamp('2022-12-22 00:00:00')
# Here we easily plot the prior data using matplotlib from pandas
# -- .loc for value based indexing
df.loc['2022-12-01':'2022-12-31'].SNWD.plot(ylabel= 'Daily Average Snow Depth [mm]')
<Axes: xlabel='DATE', ylabel='Daily Average Snow Depth [mm]'>
<Figure size 640x480 with 1 Axes>
  • How many snow days do we have each year at this station?

Pandas groupby is used for grouping the data according to the categories.

# 1- First select days with snow > 0
# 2- Create a "groupby object" based on the selected columns
# 3- use .size() to compute the size of each group
# 4- sort the values descending 

# we count days where SNOW>0, and sort them and show top 5 years:
df[df['SNOW']>0].groupby('YEAR').size().sort_values(ascending=False).head()
YEAR 2015 36 2019 34 2014 32 2008 32 2007 31 dtype: int64

Or for a more complex analysis:

For example, we have heard that this could be Denver’s first January in 13 years with no 60-degree days.

News article showing that this January is abnormally warm

Below, we show all days with high temperature above 60°F (155.5°C/10) since 2010:

df[(df['MONTH']==1) & (df['YEAR']>=2010) & (df['TMAX']>155.5)].groupby(['YEAR']).size()
YEAR 2011 1 2012 6 2013 4 2014 3 2015 6 2016 1 2017 4 2018 5 2019 3 2020 2 2021 2 2022 3 dtype: int64

This is great! But how big is this dataset for one station?

First, let’s check the file size:

!ls -lh ../data/USW00003017.csv
-rw-r--r-- 1 runner docker 3.6M Feb  5  2023 ../data/USW00003017.csv

Similar to the previous tutorial, we can use the following function to find the size of a variable on memory.

# Define function to display variable size in MB
import sys
def var_size(in_var):
    result = sys.getsizeof(in_var) / 1e6
    print(f"Size of variable: {result:.2f} MB")
var_size(df)
Size of variable: 29.39 MB

Remember, the above rule?

“Have 5 to 10 times as much RAM as the size of your dataset”
Wes McKinney (2017) in 10 things I hate about pandas

So far, we read in and analyzed data for one station. We have a total of +118,000 stations over the world and +4500 stations in Colorado alone!

What if we want to look at the larger dataset?

Scaling up to a larger dataset

Let’s start by reading data from selected stations. The downloaded data for this example includes the climatology observations from 66 selected sites in Colorado.

Pandas can concatenate data to load data spread across multiple files:

!du -csh ../data/*.csv |tail -n1
286M	total

Using a for loop with pandas.concat, we can read multiple files at the same time:

%%time
import glob
co_sites = glob.glob(os.path.join(data_dir, '*.csv'))
df = pd.concat(pd.read_csv(f, index_col=0, parse_dates=['DATE']) for f in co_sites)
CPU times: user 6.38 s, sys: 1.42 s, total: 7.8 s
Wall time: 17.4 s
  • How many stations have we read in?
print ("Concatenated data for", len(df.ID.unique()), "unique sites.")
Concatenated data for 33 unique sites.

Now that we concatenated the data for all sites in one DataFrame, we can do similar analysis on it:

  • Which site has recorded the most snow days in a year?
%%time
# ~90s on 4GB RAM
snowy_days = df[df['SNOW']>0].groupby(['ID','YEAR']).size()

print ('This site has the highest number of snow days in a year : ')
snowy_days.agg(['idxmax','max'])
This site has the highest number of snow days in a year : 
CPU times: user 218 ms, sys: 0 ns, total: 218 ms
Wall time: 479 ms
idxmax (USC00114823, 2009) max 97 dtype: object

Excersise: Which Colorado site has recorded the most snow days in 2023?

Dask allows us to conceptualize all of these files as a single dataframe!

# Let's do a little cleanup
del df, snowy_days

Computations on Dask DataFrame

Create a “LocalCluster” Client with Dask

from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client
Loading...

☝️ Click the Dashboard link above.

👈 Or click the “Search” 🔍 button in the dask-labextension dashboard.

Dask DataFrame read_csv to read multiple files

dask.dataframe.read_csv function can be used in conjunction with glob to read multiple csv files at the same time.

Remember we can read one file with pandas.read_csv. For reading multiple files with pandas, we have to concatenate them with pd.concatenate. However, we can read many files at once just using dask.dataframe.read_csv.

Overall, Dask is designed to perform I/O in parallel and is more performant than pandas for operations with multiple files or large files.

%%time
import dask
import dask.dataframe as dd

ddf = dd.read_csv(co_sites, parse_dates=['DATE'])
ddf
Loading...
ddf.TMAX.mean()
<dask_expr.expr.Scalar: expr=ArrowStringConversion(frame=FromMapProjectable(6491613))['TMAX'].mean(), dtype=float64>

Notice that the representation of the DataFrame object contains no data just headers and datatypes. Why?

Lazy Evaluation

Similar to Dask Arrays, Dask DataFrames are lazy. Here the data has not yet been read into the dataframe yet (a.k.a. lazy evaluation).
Dask just construct the task graph of the computation but it will “evaluate” them only when necessary.

So how does Dask know the name and dtype of each column?

Dask has just read the start of the first file and infers the column names and dtypes.

Unlike pandas.read_csv that reads in all files before inferring data types, dask.dataframe.read_csv only reads in a sample from the beginning of the file (or first file if using a glob). The column names and dtypes are then enforced when reading the specific partitions (Dask can make mistakes on these inferences if there is missing or misleading data in the early rows).

Let’s take a look at the start of our dataframe:

ddf.head()
Loading...

NOTE: Whenever we operate on our dataframe we read through all of our CSV data so that we don’t fill up RAM. Dask will delete intermediate results (like the full pandas DataFrame for each file) as soon as possible. This enables you to handle larger than memory datasets but, repeated computations will have to load all of the data in each time.

Similar data manipulations as pandas.dataframe can be done for dask.dataframes.
For example, let’s find the highest number of snow days in Colorado:

%%time
print ('This site has the highest number of snow days in a year : ')
snowy_days = ddf[ddf['SNOW']>0].groupby(['ID','YEAR']).size()
snowy_days.compute().agg(['idxmax','max'])
This site has the highest number of snow days in a year : 
CPU times: user 585 ms, sys: 85.3 ms, total: 670 ms
Wall time: 5.63 s
idxmax (USC00114823, 2009) max 97 dtype: object

Nice, but what did Dask do?

# Requires ipywidgets

snowy_days.dask

You can also view the underlying task graph using .visualize():

#graph is too large
snowy_days.visualize()
Loading...

Use .compute wisely!

Share intermediate results

For most operations, dask.dataframe hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, let’s compute the mean and standard deviation for Maximum daily temperature of all snow days.

snowy_days = ddf[ddf['SNOW']>0]
mean_tmax = snowy_days.TMAX.mean()
std_tmax = snowy_days.TMAX.std()
%%time

mean_tmax_result = mean_tmax.compute()
std_tmax_result = std_tmax.compute()
CPU times: user 701 ms, sys: 120 ms, total: 821 ms
Wall time: 9.37 s

But if we pass both arguments in a single .compute, we can share the intermediate results:

%%time
mean_tmax_result, std_tmax_result = dask.compute(mean_tmax, std_tmax)
CPU times: user 406 ms, sys: 53.1 ms, total: 459 ms
Wall time: 5.38 s

Here using dask.compute only one allowed sharing intermediate results between TMAX mean and median calculations and improved total performance.

mean_tmax.dask

Here some operations such as the calls to read the csv files, the filtering, and the grouping is exactly similar between both operations, so they can share intermediate results. Remember, Dask will delete intermediate results (like the full pandas DataFrame for each file) as soon as possible.

.persist or caching

Sometimes you might want your computers to keep intermediate results in memory, if it fits in the memory.

The .persist() method can be used to “cache” data and tell Dask what results to keep around. You should only use .persist() with any data or computation that fits in memory.

For example, if we want to only do analysis on a subset of data (for example snow days at Boulder site):

boulder_snow = ddf[(ddf['SNOW']>0)&(ddf['ID']=='USC00050848')]
%%time
tmax = boulder_snow.TMAX.mean().compute()
tmin = boulder_snow.TMIN.mean().compute()

print (tmin, tmax)
nan nan
CPU times: user 759 ms, sys: 116 ms, total: 875 ms
Wall time: 8.06 s
boulder_snow = ddf[(ddf['SNOW']>0)&(ddf['ID']=='USC00050848')].persist()
%%time

tmax = boulder_snow.TMAX.mean().compute()
tmin = boulder_snow.TMIN.mean().compute()
print (tmin, tmax)
nan nan
CPU times: user 289 ms, sys: 45.7 ms, total: 335 ms
Wall time: 2.62 s

As you can see the analysis on this persisted data is much faster because we are not repeating the loading and selecting.

Dask DataFrames Best Practices

Use pandas (when you can)

For data that fits into RAM, pandas can often be easier and more efficient to use than Dask DataFrame. However, Dask DataFrame is a powerful tool for larger-than-memory datasets.

When the data is still larger than memory, Dask DataFrame can be used to reduce the larger datasets to a manageable level that pandas can handle. Next, use pandas at that point.

Avoid Full-Data Shuffling

Some operations are more expensive to compute in a parallel setting than if they are in-memory on a single machine (for example, set_index or merge). In particular, shuffling operations that rearrange data can become very communication intensive.

pandas performance tips

pandas performance tips such as using vectorized operations also apply to Dask DataFrames. See Modern Pandas notebook for more tips on better performance with pandas.

Check Partition Size

Similar to chunks, partitions should be small enough that they fit in the memory, but large enough to avoid that the communication overhead.

blocksize

  • The number of partitions can be set using the blocksize argument. If none is given, the number of partitions/blocksize is calculated depending on the available memory and the number of cores on a machine up to a max of 64 MB. As we increase the blocksize, the number of partitions (calculated by Dask) will decrease. This is especially important when reading one large csv file.

As a good rule of thumb, you should aim for partitions that have around 100MB of data each.

Smart use of .compute()

Try avoiding running .compute() operation as long as possible. Dask works best when users avoid computation until results are needed. The .compute() command informs Dask to trigger computations on the Dask DataFrame.
As shown in the above example, the intermediate results can also be shared by calling .compute() only once.

Close your local Dask Cluster

It is always a good practice to close the Dask cluster you created.

client.shutdown()

Summary

In this notebook, we have learned about:

  • Dask DataFrame concept and component.
  • When to use and when to avoid Dask DataFrames?
  • How to use Dask DataFrame?
  • Some best practices around Dask DataFrames.

Resources and references