Dask logo

Dask Schedulers

In this tutorial, you learn:

  • Components of Dask Schedulers

  • Types of Dask Schedulers

  • Single Machine Schedulers

Related Documentation

Introduction

As we mentioned in our Dask overview, Dask is composed of two main parts:

  1. Dask Collections (APIs)

  2. Dynamic Task Scheduling

So far, we have talked about different Dask collections, but in this tutorial we are going to talk more about the second part.

The Dask scheduler - our task orchestrator

The Dask.distributed task scheduler is a centralized, dynamic system that coordinates the efforts of various dask worker processes spread accross different machines.

When a computational task is submitted, the Dask distributed scheduler sends it off to a Dask cluster - simply a collection of Dask workers. A worker is typically a separate Python process on either the local host or a remote machine.

To perform work, a scheduler must be assigned resources in the form of a Dask cluster. The cluster consists of the following components:

  • scheduler : A scheduler creates and manages task graphs and distributes tasks to workers.

  • workers : A worker is typically a separate Python process on either the local host or a remote machine. A Dask cluster usually consists of many workers. Basically, a worker is a Python interpretor which will perform work on a subset of our dataset.

  • client - A high-level interface that points to the scheduler (often local but not always). A client serves as the entry point for interacting with a Dask scheduler.

Dask Distributed Cluster

Image credit: Dask Contributors

Schedulers

Dask essentially offers two types of schedulers:

High level collections are used to generate task graphs which can be executed by schedulers on a single machine or a cluster.

Image credit: Dask Contributors

1. Single machine scheduler

  • The Single-machine Scheduler schedules tasks and manages the execution of those tasks on the same machine where the scheduler is running.

  • It is designed to be used in situations where the amount of data or the computational requirements are too large for a single process to handle, but not large enough to warrant the use of a cluster of machines.

  • It is relatively simple and cheap to use but it does not scale as it only runs on a single machine.

Single machine scheduler is the default choice used by Dask.

In Dask, there are several types of single machine schedulers that can be used to schedule computations on a single machine:

1.1. Single-threaded scheduler

This scheduler runs all tasks serially on a single thread.
This is only useful for debugging and profiling, but does not have any parallelization.

1.2. Threaded scheduler

The threaded scheduler uses a pool of local threads to execute tasks concurrently.
This is the default scheduler for Dask, and is suitable for most use cases on a single machine. Multithreading works well for Dask Array and Dask DataFrame.

To select one of the above scheduler for your computation, you can specify it when doing .compute():

For example:

this.compute(scheduler="single-threaded")  # for debugging and profiling only

As mentioned above the threaded scheduler is the default scheduler in Dask. But you can set the default scheduler to Single-threaded or multi-processing by:

import dask
dask.config.set(scheduler='synchronous')  # overwrite default with single-threaded scheduler

Multi-processing works well for pure Python code - delayed functions and operations on Dask Bags.

Let’s compare the performance of each of these single-machine schedulers:

import numpy as np
import dask.array as da
%%time
## - numpy performance
xn = np.random.normal(10, 0.1, size=(20_000, 20_000))
yn = xn.mean(axis=0)
yn
CPU times: user 8.42 s, sys: 276 ms, total: 8.7 s
Wall time: 8.7 s
array([ 9.99939887,  9.9998958 , 10.0015604 , ...,  9.99993883,
       10.00069099,  9.99991128])
%%time
# -- dask array using the default
xd = da.random.normal(10, 0.1, size=(20_000, 20_000), chunks=(2000, 2000))
yd = xd.mean(axis=0)
yd.compute()
CPU times: user 13.3 s, sys: 39.9 ms, total: 13.3 s
Wall time: 3.45 s
array([ 9.99908424, 10.00011291,  9.99973998, ..., 10.00020295,
        9.99906628,  9.9997104 ])
import time
# -- dask testing different schedulers:
for sch in ['threading', 'processes', 'sync']:
    t0 = time.time()
    r = yd.compute(scheduler=sch)
    t1 = time.time()
    print(f"{sch:>10} :  {t1 - t0:0.4f} s")
 threading :  3.3203 s
 processes :  4.1544 s
      sync :  8.4827 s
yd.dask

HighLevelGraph

HighLevelGraph with 4 layers and 240 keys from all layers.

Layer1: normal

normal-30ae67de957cedc7697b032a47fdf2f2

layer_type MaterializedLayer
is_materialized True
number of outputs 100
shape (20000, 20000)
dtype float64
chunksize (2000, 2000)
type dask.array.core.Array
chunk_type numpy.ndarray
20000 20000

Layer2: mean_chunk

mean_chunk-71b2442153fda619dba415706574d10d

layer_type Blockwise
is_materialized True
number of outputs 100
shape (20000, 20000)
dtype float64
chunksize (2000, 2000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on normal-30ae67de957cedc7697b032a47fdf2f2
20000 20000

Layer3: mean_combine-partial

mean_combine-partial-e454e91bd991e9a54e09d066e8201c6f

layer_type MaterializedLayer
is_materialized True
number of outputs 30
shape (3, 20000)
dtype float64
chunksize (1, 2000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on mean_chunk-71b2442153fda619dba415706574d10d
20000 3

Layer4: mean_agg-aggregate

mean_agg-aggregate-411c1ebd5c62718e0fa2dd8449b72380

layer_type MaterializedLayer
is_materialized True
number of outputs 10
shape (20000,)
dtype float64
chunksize (2000,)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on mean_combine-partial-e454e91bd991e9a54e09d066e8201c6f
20000 1
yd
Array Chunk
Bytes 156.25 kiB 15.62 kiB
Shape (20000,) (2000,)
Dask graph 10 chunks in 4 graph layers
Data type float64 numpy.ndarray
20000 1
  • Notice how sync scheduler takes almost the same time as pure NumPy code.

  • Why is the multiprocessing scheduler so much slower?

If you use the multiprocessing backend, all communication between processes still needs to pass through the main process because processes are isolated from other processes. This introduces a large overhead.

The Dask developers recommend using the Dask Distributed Scheduler which we will cover now.

2. Distributed scheduler

  • The Distributed scheduler or dask.distributed schedules tasks and manages the execution of those tasks on workers from a single or multiple machines.

  • This scheduler is more sophisticated and offers more features including a live diagnostic dashboard which provides live insight on performance and progress of the calculations.

In most cases, dask.distributed is preferred since it is very scalable, and provides and informative interactive dashboard and access to more complex Dask collections such as futures.

2.1. Local Cluster

A Dask Local Cluster refers to a group of worker processes that run on a single machine and are managed by a single Dask scheduler.

This is useful for situations where the computational requirements are not large enough to warrant the use of a full cluster of separate machines. It provides an easy way to run parallel computations on a single machine, without the need for complex cluster management or other infrastructure.

Let’s start by creating a Local Cluster

For this we need to set up a LocalCluster using dask.distributed and connect a client to it.

from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)
client

Client

Client-9f6564a8-1238-11ef-8af8-00224830bfb1

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

☝️ Click the Dashboard link above.

👈 Or click the “Search” 🔍 button in the dask-labextension dashboard.

If no arguments are specified in LocalCluster it will automatically detect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.

A LocalCluster will use the full resources of the current JupyterLab session. For example, if you used BinderHub, it will use the number of CPUs selected.

Note that LocalCluster() takes a lot of optional arguments, allowing you to configure the number of processes/threads, memory limits and other settings.

You can also find your cluster dashboard link using :

cluster.dashboard_link
'http://127.0.0.1:8787/status'
%%time
# -- dask array using the default
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()
CPU times: user 592 ms, sys: 40.3 ms, total: 632 ms
Wall time: 8.82 s
array([ 9.99937013, 10.00067014,  9.99943929, ...,  9.99896329,
       10.00056107, 10.00001498])

Always remember to close your local Dask cluster:

client.shutdown()

Dask Distributed (Cluster Managers)

So far we have talked about running a job on a local machine.

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system.

High level collections are used to generate task graphs which can be executed by schedulers on a single machine or a cluster.

Image credit: Dask Contributors

Dask Cluster Managers have different names corresponding to different computing environments. Some examples are dask-jobqueue for your HPC systems (including PBSCluster) or Kubernetes Cluster for machines on the Cloud.

The NCAR tutorial series includes an in-depth exploration and practical use cases of Dask on HPC systems and best practices for Dask on HPC. For the complete set of NCAR tutorial materials, please refer to the main NCAR tutorial content available here.

For more information visit the Dask Docs.