Kerchunk and Xarray-Datatree
Overview
In this tutorial we are going to use a large collection of pre-generated Kerchunk
reference files and open them with Xarray’s new DataTree functionality. This chapter is heavily inspired by this blog post.
About the Dataset
This collection of reference files were generated from the NASA NEX-GDDP-CMIP6 (Global Daily Downscaled Projections) dataset. A version of this dataset is hosted on s3
as a collection of NetCDF files.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Required |
Core |
|
Required |
IO/Visualization |
|
Required |
IO |
Time to learn: 30 minutes
Motivation
In total the dataset is roughly 12TB in compressed blob storage, with a single NetCDF
file per yearly timestep, per variable. Downloading this entire dataset for analysis on a local machine would difficult to say the least. The collection of Kerchunk
reference files for this entire dataset is only 272 Mb, which is about 42,000 times smaller!
Imports
import dask
import hvplot.xarray # noqa
import pandas as pd
import xarray as xr
from xarray import DataTree
from distributed import Client
from fsspec.implementations.reference import ReferenceFileSystem
Read the reference catalog
The NASA NEX-GDDP-CMIP6 dataset is organized by GCM, Scenario and Ensemble Member. Each of these Scenario/GCM combinations is represented as a combined reference file, which was created by merging across variables and concatenating along time-steps. All of these references are organized into a simple .csv
catalog in the schema:
GCM/Scenario |
url |
---|
Organzing with Xarray-Datatree
Not all of the GCM/Scenario reference datasets have shared spatial coordinates and many of the have slight differences in their calendar and thus time dimension.
Because of this, these cannot be combined into a single Xarray-Dataset
. Fortunately Xarray-Datatree
provides a higher level abstraction where related Xarray-Datasets
are organized into a tree structure where each dataset corresponds to a leaf
.
# Read the reference catalog into a Pandas DataFrame
cat_df = pd.read_csv(
"s3://carbonplan-share/nasa-nex-reference/reference_catalog_nested.csv"
)
# Convert the DataFrame into a dictionary
catalog = cat_df.set_index("ID").T.to_dict("records")[0]
Load Reference Datasets into Xarray-DataTree
In the following cell we create a function load_ref_ds
, which can be parallelized via Dask
to load Kerchunk
references into a dictionary of Xarray-Datasets
.
def load_ref_ds(url: str):
fs = ReferenceFileSystem(
url,
remote_protocol="s3",
target_protocol="s3",
remote_options={"anon": True},
target_options={"anon": True},
lazy=True,
)
return xr.open_dataset(
fs.get_mapper(),
engine="zarr",
backend_kwargs={"consolidated": False},
chunks={"time": 300},
)
tasks = {id: dask.delayed(load_ref_ds)(url) for id, url in catalog.items()}
Use Dask Distributed to load the Xarray-Datasets from Kerchunk reference files
Using Dask
, we are loading 164 reference datasets into memory. Since they are are Xarray
datasets the coordinates are loaded eagerly, but the underlying data is still lazy.
client = Client(n_workers=8)
client
Client
Client-740026f1-a04f-11ef-8d9d-000d3ad367d5
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
02c0f044
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-7fef87a0-5e7c-4e0a-a6ed-a13fb2ca7a6b
Comm: tcp://127.0.0.1:35007 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:40625 | Total threads: 1 |
Dashboard: http://127.0.0.1:41945/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39943 | |
Local directory: /tmp/dask-scratch-space/worker-c4hcm6ls |
Worker: 1
Comm: tcp://127.0.0.1:33841 | Total threads: 1 |
Dashboard: http://127.0.0.1:43279/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:34957 | |
Local directory: /tmp/dask-scratch-space/worker-j6mcvnzo |
Worker: 2
Comm: tcp://127.0.0.1:43239 | Total threads: 1 |
Dashboard: http://127.0.0.1:41485/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:34253 | |
Local directory: /tmp/dask-scratch-space/worker-q5yi59wz |
Worker: 3
Comm: tcp://127.0.0.1:44549 | Total threads: 1 |
Dashboard: http://127.0.0.1:46673/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39243 | |
Local directory: /tmp/dask-scratch-space/worker-zmiquztu |
Worker: 4
Comm: tcp://127.0.0.1:34951 | Total threads: 1 |
Dashboard: http://127.0.0.1:37419/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:35663 | |
Local directory: /tmp/dask-scratch-space/worker-_uhkjoh9 |
Worker: 5
Comm: tcp://127.0.0.1:32945 | Total threads: 1 |
Dashboard: http://127.0.0.1:44783/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44331 | |
Local directory: /tmp/dask-scratch-space/worker-mqlnun4r |
Worker: 6
Comm: tcp://127.0.0.1:42959 | Total threads: 1 |
Dashboard: http://127.0.0.1:45123/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:40769 | |
Local directory: /tmp/dask-scratch-space/worker-das5y1xo |
Worker: 7
Comm: tcp://127.0.0.1:39363 | Total threads: 1 |
Dashboard: http://127.0.0.1:41849/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:40539 | |
Local directory: /tmp/dask-scratch-space/worker-s_3m7uoh |
catalog_computed = dask.compute(tasks)
Build an Xarray-Datatree from the dictionary of datasets
dt = DataTree.from_dict(catalog_computed[0])
Accessing the Datatree
A Datatree
is a collection of related Xarray
datasets. We can access individual datasets using UNIX
syntax. In the cell below, we will access a single dataset from the datatree
.
dt["ACCESS-CM2/ssp585"]
# or
dt["ACCESS-CM2"]["ssp585"]
<xarray.DatasetView> Size: 977GB Dimensions: (time: 31411, lat: 600, lon: 1440) Coordinates: * lat (lat) float64 5kB -59.88 -59.62 -59.38 -59.12 ... 89.38 89.62 89.88 * lon (lon) float64 12kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9 * time (time) datetime64[ns] 251kB 2015-01-01T12:00:00 ... 2100-12-31T1... Data variables: hurs (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> huss (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> pr (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> rlds (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> rsds (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> sfcWind (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tas (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tasmax (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tasmin (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> Attributes: (12/22) Conventions: CF-1.7 activity: NEX-GDDP-CMIP6 cmip6_institution_id: CSIRO-ARCCSS cmip6_license: CC-BY-SA 4.0 cmip6_source_id: ACCESS-CM2 contact: Dr. Rama Nemani: rama.nemani@nasa.gov, Dr. Bridget... ... ... scenario: ssp585 source: BCSD title: ACCESS-CM2, r1i1p1f1, ssp585, global downscaled CM... tracking_id: dcae752c-5de5-4170-8a98-e7043538d702 variant_label: r1i1p1f1 version: 1.0
Convert a Datatree node to a Dataset
dt["ACCESS-CM2"]["ssp585"].to_dataset()
<xarray.Dataset> Size: 977GB Dimensions: (time: 31411, lat: 600, lon: 1440) Coordinates: * lat (lat) float64 5kB -59.88 -59.62 -59.38 -59.12 ... 89.38 89.62 89.88 * lon (lon) float64 12kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9 * time (time) datetime64[ns] 251kB 2015-01-01T12:00:00 ... 2100-12-31T1... Data variables: hurs (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> huss (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> pr (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> rlds (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> rsds (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> sfcWind (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tas (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tasmax (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> tasmin (time, lat, lon) float32 109GB dask.array<chunksize=(300, 600, 1440), meta=np.ndarray> Attributes: (12/22) Conventions: CF-1.7 activity: NEX-GDDP-CMIP6 cmip6_institution_id: CSIRO-ARCCSS cmip6_license: CC-BY-SA 4.0 cmip6_source_id: ACCESS-CM2 contact: Dr. Rama Nemani: rama.nemani@nasa.gov, Dr. Bridget... ... ... scenario: ssp585 source: BCSD title: ACCESS-CM2, r1i1p1f1, ssp585, global downscaled CM... tracking_id: dcae752c-5de5-4170-8a98-e7043538d702 variant_label: r1i1p1f1 version: 1.0
Operations across a Datatree
A Datatree
contains a collection of datasets with related coordinates and variables. Using some in-built methods, we can analyze it as if it were a single dataset. Instead of looping through hundreds of Xarray
datasets, we can apply operations across the Datatree
. In the example below, we will lazily create a time-series.
ts = dt.mean(dim=["lat", "lon"])
Visualize a single dataset with HvPlot
display( # noqa
dt["ACCESS-CM2/ssp585"].to_dataset().pr.hvplot("lon", "lat", rasterize=True)
)
Shut down the Dask cluster
client.shutdown()