Dask logo

Dask Array

In this tutorial, you learn:

  • What is a Dask Array?

  • Basic concepts and features of Dask Arrays

  • Working with Dask arrays

Related Dask Array Documentation

Dask Arrays

Dask Arrays are basically parallelized version of NumPy arrays for processing larger-than-memory data sets.

https://docs.dask.org/en/stable/_images/dask-array.svg

Image credit: Dask Contributors

Dask Array can be used as a drop-in replacement for NumPy arrays, with a similar API and support for a subset of NumPy functions.

Dask effectively reduces the memory footprint of large array computations by dividing the arrays into smaller pieces (called chunks) that can fit into memory and stream the data from disk.

Dask Arrays are lazy: Unlike Numpy, operations on Dask arrays are not computed until you explicitly request them.

Lazy Evaluation: objects are evaluated just in time when the results are needed!

Lazy evaluation help us avoid having large pieces of memory resident on the workers and optimize the resource requirements.

Dask Arrays don’t directly hold any data. Instead, they provide a symbolic representation of the necessary computations to generate the data. We will explain this more below.

Let’s start exploring Dask Arrays:

Setup: Start a Dask Client

We will talk in-depth about Dask Cluster and Dask Clients later in this tutorial. Here we just created a local cluster and attached a client to it.

from dask.distributed import Client

client = Client()
client

Client

Client-aa82aae8-b27f-11ef-89cf-000d3a3c95f2

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Blocked Algorithms

Dask Arrays use blocked algorithms to split large computations into smaller computations which operate on subsets of the data (called chunks).

Let’s see what this means in an example:

import numpy as np
import dask.array as da

# A 4x4 numpy array that goes from 1 to 16 

narr = np.array([
        [ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15, 16]
        ])

# -- convert numpy array to dask array with 4 chunks
darr = da.from_array( narr,chunks=(2, 2))

Now we can calculate the sum of this array using darr.sum() similar to numpy. But how is it different from numpy?

When you take the sum of the Dask array, Dask first takes the sum of each chunk and only after each of those is completed, takes the sum of the results from each chunk.

https://d33wubrfki0l68.cloudfront.net/f7bf6ca40c8f217386f83795b36e0c964c6a9d2b/ad6da/images/blog/what-is-dask-blockwise-sum.jpg

Image adapted from saturncloud.io

Task Graph

The Dask Task Graph serves as a blueprint for executing the computations.

The Task Graph defines the (1) relationships between tasks, and (2) the order in which they should be executed.

In a task graph each node in the graph represents a task and lines represent the dependencies/relationships between tasks.

We can visualize the low-level task graph using .visualize() method.

darr.sum().visualize(rankdir="LR")
../_images/83660aa4bd67dd0f3c771b18bca5d13a5a04dcf92dab958f2e1c569662d35e10.png

It is generally good practice to look at the task graph before executing the computation. By looking at the task graph, you can learn about potential bottlenecks where parallelism is not possible.

TIP: For big computations, low-level task graphs gets very confusing. An alternative that provides a more concise graph is using .dask.visualize().

#darr.sum().dask.visualize()

Now, let’s start with another example. Here we create a 2D array of ones using NumPy.

shape = (10000,12000)

ones_np = np.ones(shape)
ones_np
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

Now, let’s create the same array using Dask:

ones_da = da.ones(shape)
ones_da
Array Chunk
Bytes 915.53 MiB 128.00 MiB
Shape (10000, 12000) (4096, 4096)
Dask graph 9 chunks in 1 graph layer
Data type float64 numpy.ndarray
12000 10000

We see a Dask Array representation of the data. This is a symbolic representation; no data has actually been generated yet.

As we discussed previously, this mode of operation is called “lazy”.

This allows the user to build up a series of computations or tasks before being passed to the scheduler for execution.

Chunks

When checking the Dask array, the symbolic representation illustrates the concept of chunks. Dask arrays split the data into sub-arrays (or chunks) to optimize computation with large arrays.

Chunking an array

The way that arrays are chunked can significantly affect total performance.

For specifying the chunking of an array, we use the chunks argument when creating our dask.array.

⚠️ WARNING: Please note that chunks argument stands for chunk shape rather than “number of chunks”.
For example, chunks=1 means that you will have several chunks with one element.

There are several ways to define chunks. For example:

  1. A uniform dimension size like 1000, meaning chunks of size 1000 in each dimension.

  2. A uniform chunk shape like (1000, 2000, 3000), meaning chunks of size 1000 in the first axis, 2000 in the second axis, and 3000 in the third.

  3. Fully explicit sizes of all blocks for all dimensions, like ((1000, 1000, 500), (400, 400), (5, 5, 5, 5, 5))

  4. A dictionary specifying chunk size per dimension like {0: 1000, 1: 2000, 2: 3000}.

Let’s recreate the above Dask array, but this time we will specify chunk sizes (a.k.a. shapes) using the argument chunks.

# -- remember what the shape of our data array was
shape
(10000, 12000)
# create a dask array with 6 chunks
chunk_shape = (5000,4000)
ones_da = da.ones(shape,chunks=chunk_shape)
ones_da
Array Chunk
Bytes 915.53 MiB 152.59 MiB
Shape (10000, 12000) (5000, 4000)
Dask graph 6 chunks in 1 graph layer
Data type float64 numpy.ndarray
12000 10000

You can see in the above dask array representation that we now have 6 chunks, each of shape (5000,4000) and size of ~ 160.0 MiB.

Performance Comparison

To compare the performance between a NumPy array and an equivalent Dask array, let’s calculate the mean.

%%time
# The %%time cell magic measures the execution time of the whole cell
ones_np.mean()
CPU times: user 50 ms, sys: 193 μs, total: 50.2 ms
Wall time: 47.8 ms
np.float64(1.0)
%%time
# Remember, we are not doing any computation here, just constructing our task graph
mean_of_ones_da = ones_da.mean()
CPU times: user 1.46 ms, sys: 0 ns, total: 1.46 ms
Wall time: 1.45 ms

Remember :

Dask doesn’t do anything until you tell it… It is lazy!

So far we have just constructed our task graph but no computations yet!

NOTE: In order to generate the data, we need to call the .compute() method on the Dask Array to trigger our computation.

Let’s run the .compute() method to see how this works:

%%time
mean_of_ones_da.compute()
CPU times: user 31.9 ms, sys: 923 μs, total: 32.8 ms
Wall time: 274 ms
np.float64(1.0)

.compute() method convertes Dask Arrays to Numpy Arrays. Let’s check to see if this is true:

type (ones_da.compute())
numpy.ndarray

WARNING: Typically, when working with Dask arrays, we do not want to generate the data right away by calling .compute() on a large array.

We usually want to perform some computations that reduce the data size. For example, we might compute statistics like the mean or standard deviation.

Let’s look at an example of taking the mean and visualize the task graph. Remember, that no actual computation is taking place until we call .compute().

mean_of_ones_da = ones_da.mean()
mean_of_ones_da.visualize(rankdir="LR")
../_images/6dff64d47f06d6421a7c515c458f4f122bd5d7209ce6955d73c9f86e296d29a3.png

What are the sizes of these arrays in memory?

First, let’s define a function that returns array size in MiB.

import sys

# Define function to display variable size in MiB
def var_size(in_var):
    result = sys.getsizeof(in_var) / 1024/1024
    return (result)
print("Shape of the numpy array : ", ones_np.shape) 
print("Shape of the dask array  : ", ones_da.shape) 

# memory size of numpy array in MiB
print(f"Memory size of numpy array in MB : {var_size(ones_np):.2f} MiB")
# memory size of dask array in MiB
print(f"Memory size of dask array in MB  : {var_size(ones_da):.2f} MiB")
Shape of the numpy array :  (10000, 12000)
Shape of the dask array  :  (10000, 12000)
Memory size of numpy array in MB : 915.53 MiB
Memory size of dask array in MB  : 0.00 MiB

Why memory size for the above Dask array is zero?

Remember, this variable is only a graph representation of the full array which will be split across workers.

However, Dask does give us ways to see the full size of the data (often much larger than your client machine can handle)!

print("Size of Dask dataset:  {:.2f} MiB".format(ones_da.nbytes / 1024/1024))
Size of Dask dataset:  915.53 MiB

Larger Data

The previous example illustrated how Dask works, but using Dask is not really necessary (nor advisable) for an array of size 915.53 MiB.
Let’s try an example using bigger data and bigger calculations:

big_shape = (2000, 200, 2000)

# -- this will make a big numpy array that might not fit on your machine
#big_np = np.ones(big_shape)

Make a similar Dask Array with similar shape but specifying the chunks size:

big_da = da.ones(big_shape)
big_da
Array Chunk
Bytes 5.96 GiB 127.44 MiB
Shape (2000, 200, 2000) (289, 200, 289)
Dask graph 49 chunks in 1 graph layer
Data type float64 numpy.ndarray
2000 200 2000
# size of data
#print("Memory size of NumPy dataset :  {:.2f} GiB".format(big_np.nbytes / 1024/1024/1024))
print("Memory size of Dask dataset  :  {:.2f} GiB".format(big_da.nbytes / 1024/1024/1024))
Memory size of Dask dataset  :  5.96 GiB

This may be close to the available memory/RAM that you have in your computer.

Let’s try bigger calculations on this array:

#%%time 
#z_np = (big_np + big_np.T)[::2,:].mean()
%%time

z_da = (big_da + big_da.T)[::2,:].mean(axis=2)

result = z_da.compute()
CPU times: user 257 ms, sys: 49.6 ms, total: 307 ms
Wall time: 928 ms
#-- warning : do not try low level visualization with big arrays

#z_da.visualize()

All the usual NumPy functions work on dask arrays, though the computations will remain lazy until you either call .compute(), .load() or your want to plot the data.

As we discussed above, the way that Dask arrays are chunked can significantly affect the performance. In the remainder of this notebook, let’s do a similar calculation using a different chunks size.

We will learn more about best practices regarding chunk size later during the tutorial.

Supplementary Material: Rechunking Arrays

We can change the chunking of a Dask array, using the rechunk method. Please note that rechunking Dask arrays can be very expensive, so choosing an appropriate chunk size initially is ideal.

new_chunk_shape = (50,50,50)
rechunked_big_da = big_da.rechunk(new_chunk_shape)
rechunked_big_da
Array Chunk
Bytes 5.96 GiB 0.95 MiB
Shape (2000, 200, 2000) (50, 50, 50)
Dask graph 6400 chunks in 2 graph layers
Data type float64 numpy.ndarray
2000 200 2000
%%time
# perform big computation on chunked array

z_da_rechunked = (rechunked_big_da + rechunked_big_da.T)[::2,:].mean(axis=2)

result = z_da_rechunked.compute()
/home/runner/miniconda3/envs/dask-cookbook/lib/python3.10/site-packages/distributed/client.py:3371: UserWarning: Sending large graph of size 12.57 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
CPU times: user 20.5 s, sys: 746 ms, total: 21.2 s
Wall time: 30.9 s

We can see how the choice of smaller chunks (more total chunks) significantly reduce the total performance of our computation.

TIP: As a rule of thumb, a chunk should be big enough so that the computation on that chunk take significantly longer than the overhead from Dask scheduler. The Dask scheduler takes roughly 1ms per task for scheduling.

Let’s try a bigger chunk size:

new_chunk_shape = (500, 100, 500)
#big_chunk = 

rechunked_big_da = big_da.rechunk(new_chunk_shape)
rechunked_big_da
Array Chunk
Bytes 5.96 GiB 190.73 MiB
Shape (2000, 200, 2000) (500, 100, 500)
Dask graph 32 chunks in 2 graph layers
Data type float64 numpy.ndarray
2000 200 2000
%%time
# perform big computation on chunked array

z_da_rechunked = (rechunked_big_da + rechunked_big_da.T)[::2,:].mean(axis=2)

result = z_da_rechunked.compute()
CPU times: user 904 ms, sys: 88.1 ms, total: 992 ms
Wall time: 4.56 s

TIP: As a rule of thumb, a chunk should be small enough to fit comfortably in the memory. Chunk sizes between 10MB-1GB are common, depending on your machine,

client.close()

Summary:

  • Dask Array provides parallel computing capabilities by dividing arrays into smaller pieces called chunks.

  • Blocked algorithms split large computations into smaller computations which operate on subsets of the array.

  • Dask Array supports efficient computation on large arrays through a combination of lazy evaluation and task parallelism.

  • Dask Array can be used as a drop-in replacement for NumPy ndarray, with a similar API and support for a subset of NumPy functions.

  • The way that arrays are chunked can significantly affect total performance. Poor chunking can singifincantly worsen performance of Dask compared to NumPy.

Resources and references