Dask Logo

Kerchunk and Dask

Overview

In this notebook we will cover:

  1. How to parallelize the creation of Kerchunk reference files using the Dask library.

  2. How to scale up the creation of large Kerchunk datasets using Dask.

This notebook builds upon the Kerchunk Basics and the Multi-File Datasets with Kerchunk notebooks. A basic understanding of Dask will be helpful, but is not required. This notebook is not intended as a tutorial for using Dask, but will show how to use Dask to greatly speedup the the generation of Kerchunk reference files.

Prerequisites

Concepts

Importance

Notes

Kerchunk Basics

Required

Core

Multiple Files and Kerchunk

Required

Core

Introduction to Xarray

Recommended

IO/Visualization

Intro to Dask

Recommended

Parallel Processing

  • Time to learn: 45 minutes


Dask and Parallel Processing

Dask is a Python library for parallel computing. It plays well with Xarray, but can be used in many ways across the Python ecosystem. This notebook is not intended to be a guide for how to use Dask, but just an example of how to use Dask to parallelize some Kerchunk functionality.

In the previous notebook Multiple Files and Kerchunk, we were looking at daily downscaled climate data over South-Eastern Alaska. In our function named generate_json_reference, we were looping over, one at a time, the input NetCDF4 files and using Kerchunk's SingleHdf5ToZarr method to create a Kerchunk index for each file.

With Dask, we can call SingleHdf5ToZarr in parallel, which allows us to create multiple Kerchunk reference files at the same time.

Further on in this notebook, we will show how using Dask can greatly speed-up the process of creating a virtual dataset using Kerchunk

Setting up the Dask Client

In the code below, we are importing Dask Disributed and creating a client. This is the start of our parallel Kerchunk data processing. We are passing the argument n_workers=8. This will inform the Dask client on some of the resource limitations.

Note: Depending on if you are running on a small machine such as a laptop or a larger compute hub, these resources could be tuned to improve performance.

import logging

from distributed import Client

client = Client(n_workers=8, silence_logs=logging.ERROR)
client

Client

Client-3bf644bd-21bd-11ee-8f2e-0022481ea464

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Binder Specific Setup

If you are running this tutorial on Binder, the configuration may look slightly different.

Once you start the client, some information should be returned to you as well as a button that says:

Launch Dashboard in JupyterLab

../../_images/client_binder.png

Once you click that button, multiple windows of the Dask dashboard should open up.

../../_images/binder_client2.png

Building off of our Previous Work

In the next section, we will re-use some of the code from Multiple Files and Kerchunk notebook. However, we will modify it slightly to make it compatable with Dask.

The following two cells should look the same as before. As a reminder we are importing the required libraries, using fsspec to create a list of our input files and setting up some kwargs for fsspec to use.

import os
from tempfile import TemporaryDirectory

import dask
import fsspec
import ujson
import xarray as xr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
# Initiate fsspec filesystems for reading and writing
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
fs_write = fsspec.filesystem("")

# Retrieve list of available days in archive for the year 2060.
files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*")

# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])

# Define kwargs for `fsspec`
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")


# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
temp_dir
'/tmp/tmp7thco7e1'

Subset the Data

To speed up our example, lets take a subset of the year of data.

file_pattern = file_pattern[0:40]

Dask Specific Changes

Here is the section of code that will change. Instead of iterating through each input file and using generate_json_reference to create the Kerchunk reference files, we are iterating through our input file list and creating Dask Delayed Objects. It is not super important to understand this, but a Dask Delayed Object is lazy, meaning it is not computed eagerly. Once we have iterated through all our input files, we end up with a list of Dask Delayed Objects.

When we are ready, we can call dask.compute on this list of delayed objects to create Kerchunk reference files in parallel.

# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from a NetCDF file.
def generate_json_reference(fil, temp_dir: str):
    with fs_read.open(fil, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
        fname = fil.split("/")[-1].strip(".nc")
        outf = f"{temp_dir}/{fname}.json"
        with open(outf, "wb") as f:
            f.write(ujson.dumps(h5chunks.translate()).encode())
        return outf


# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in file_pattern]

Dask Task Graph

Once you call dask.compute it can be hard to understand what is happening and how far along the process is at any time. Fortunately, Dask has a built in dashboard to help visualize your progress.

Running this notebook locally

When you first initialized the Dask client earlier on, it should have returned some information including an address to the dashboard. For example: http://127.0.0.1:8787/status

By navigating to this address, you should a Dask dashboard that looks something like this.



../../_images/empty_dashboard.png



When you call dask.compute(tasks), the dashboard should populate with a bunch of tasks. In the dashboard you can monitor your progress, see how resources are being used as well as well as countless other functionality.



../../_images/dashboard.png



Running on Binder

If you are running this example notebook on Binder, the Dask dashboard should look slightly different. Since Binder is running the notebook on another computer, navigating to localhost will give you an error. The Binder specific Dask graph should look something more like this:



../../_images/task_stream_full.png



Start the Dask Processing

# Start parallel computation of `SingleHDF5ToZarr`
dask.compute(tasks)
(['/tmp/tmp7thco7e1/WRFDS_2060-01-01.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-02.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-03.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-04.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-05.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-06.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-07.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-08.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-09.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-10.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-11.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-12.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-13.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-14.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-15.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-16.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-17.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-18.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-19.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-20.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-21.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-22.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-23.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-24.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-25.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-26.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-27.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-28.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-29.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-30.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-01-31.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-01.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-02.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-03.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-04.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-05.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-06.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-07.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-08.json',
  '/tmp/tmp7thco7e1/WRFDS_2060-02-09.json'],)

Timing

To demonstrate how Dask can speed-up your Kerchunk reference generation, the next section will show the timing of generating reference files with and without Dask. For reference, the timing was run on a Large AWS Jupyter-Hub (managed by the fine folks at 2i2c) with ~32 CPU and ~256G RAM. It is also important to note that the data is also hosted on AWS.

Serial Kerchunk

Parallel Kerchunk (Dask)

3min 41s

39.2 s

Running our Dask version on a subset of 40 files took only ~39 seconds. In comparison, computing the Kerchunk indicies one-by-one in took about 3 minutes and 41 seconds.

Just by changing a few lines of code and using Dask, we got our code to run almost 6x faster. One other detail to note is that there is usually a bit of a delay as Dask builds its task graph before any of the tasks are started. All that to say, you may see even better performance when using Dask and Kerchunk on larger datasets.

Note: These timings may vary for you. There are many factors that may affect performance, such as:

  • Geographical location of your compute and the source data

  • Internet speed

  • Compute resources, IO limits and # of workers given to Dask

  • Location of where to write reference files to (cloud vs local)

  • Type of reference files (Parquet vs .JSON)

This is meant to be an example of how Dask can be used to speed-up Kerchunk not a detailed benchmark in Kerchunk/Dask performance.

Next Steps

In this notebook we demonstrated how Dask can be used to parallelize the creation of Kerchunk reference files. In the following Case Studies section, we will walk though examples of using Kerchunk with Dask to create virtual cloud-optimized datasets.

Additionally, if you wish to explore more of Kerchunk's Dask integration, you can try checking out Kerchunk's auto_dask method, which combines many of the Dask setup steps into a single convenient function.