Store virtual datasets as Kerchunk Parquet references
Overview
In this notebook we will cover how to store virtual datasets as Kerchunk Parquet references instead of Kerchunk JSON references. For large virtual datasets, using Parquet should have performance implications as the overall reference file size should be smaller and the memory overhead of combining the reference files should be lower.
This notebook builds upon the Kerchunk Basics, Multi-File Datasets with Kerchunk and the Kerchunk and Dask notebooks.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Parallel virtual dataset creation with VirtualiZarr, Kerchunk, and Dask |
Required |
Core |
Required |
IO/Visualization |
Time to learn: 30 minutes
Imports
import logging
import dask
import fsspec
import xarray as xr
from distributed import Client
from virtualizarr import open_virtual_dataset
Setting up the Dask
Client
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Client
Client-5c0306aa-b0ee-11ef-8a49-7c1e5222ecf8
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
58077061
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-8b443566-fa54-46e6-9c15-32b226d6b63f
Comm: tcp://127.0.0.1:39621 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:40029 | Total threads: 1 |
Dashboard: http://127.0.0.1:33737/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39623 | |
Local directory: /tmp/dask-scratch-space/worker-7nqg56it |
Worker: 1
Comm: tcp://127.0.0.1:43549 | Total threads: 1 |
Dashboard: http://127.0.0.1:43823/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:33009 | |
Local directory: /tmp/dask-scratch-space/worker-l24qoopn |
Worker: 2
Comm: tcp://127.0.0.1:39131 | Total threads: 1 |
Dashboard: http://127.0.0.1:43781/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44089 | |
Local directory: /tmp/dask-scratch-space/worker-uvuw9vpj |
Worker: 3
Comm: tcp://127.0.0.1:34735 | Total threads: 1 |
Dashboard: http://127.0.0.1:44297/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39803 | |
Local directory: /tmp/dask-scratch-space/worker-vblkrljv |
Worker: 4
Comm: tcp://127.0.0.1:41259 | Total threads: 1 |
Dashboard: http://127.0.0.1:35547/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:36655 | |
Local directory: /tmp/dask-scratch-space/worker-o90py5k5 |
Worker: 5
Comm: tcp://127.0.0.1:39273 | Total threads: 1 |
Dashboard: http://127.0.0.1:33585/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:38901 | |
Local directory: /tmp/dask-scratch-space/worker-9u042q1x |
Worker: 6
Comm: tcp://127.0.0.1:37513 | Total threads: 1 |
Dashboard: http://127.0.0.1:40259/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:37069 | |
Local directory: /tmp/dask-scratch-space/worker-qwhe6384 |
Worker: 7
Comm: tcp://127.0.0.1:41759 | Total threads: 1 |
Dashboard: http://127.0.0.1:42057/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:40713 | |
Local directory: /tmp/dask-scratch-space/worker-xw7u_idt |
Create Input File List
Here we are using fsspec's
glob functionality along with the *
wildcard operator and some string slicing to grab a list of NetCDF files from a s3
fsspec
filesystem.
# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
files_paths = fs_read.glob("s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/*")
# Here we prepend the prefix 's3://', which points to AWS.
files_paths = sorted(["s3://" + f for f in files_paths])
Subset the Data
To speed up our example, lets take a subset of the year of data.
# If the subset_flag == True (default), the list of input files will
# be subset to speed up the processing
subset_flag = True
if subset_flag:
files_paths = files_paths[0:4]
Generate Lazy References
Here we create a function to generate a list of Dask delayed objects.
def generate_virtual_dataset(file, storage_options):
return open_virtual_dataset(
file, indexes={}, reader_options={"storage_options": storage_options}
)
storage_options = dict(anon=True, default_fill_cache=False, default_cache_type="first")
# Generate Dask Delayed objects
tasks = [
dask.delayed(generate_virtual_dataset)(file, storage_options)
for file in files_paths
]
Start the Dask Processing
To view the processing you can view it in real-time on the Dask Dashboard. ex: http://127.0.0.1:8787/status
virtual_datasets = list(dask.compute(*tasks))
Combine virtual datasets using VirtualiZarr
combined_vds = xr.combine_nested(
virtual_datasets, concat_dim=["time"], coords="minimal", compat="override"
)
combined_vds
<xarray.Dataset> Size: 140MB Dimensions: (time: 4, y: 1249, x: 999) Coordinates: y (y) float32 5kB ManifestArray<shape=(1249,), dtype=flo... time (time) int32 16B ManifestArray<shape=(4,), dtype=int32... x (x) float32 4kB ManifestArray<shape=(999,), dtype=floa... Data variables: magViento10 (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... lat (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... T2 (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... HR2 (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... PP (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... dirViento10 (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... lon (time, y, x) float32 20MB ManifestArray<shape=(4, 1249... Lambert_Conformal (time) float32 16B ManifestArray<shape=(4,), dtype=flo...
Write the virtual dataset to a Kerchunk Parquet reference
combined_vds.virtualize.to_kerchunk("combined.parq", format="parquet")
Shutdown the Dask cluster
client.shutdown()
Load kerchunked dataset
Next we initiate a fsspec
ReferenceFileSystem
.
We need to pass:
The name of the parquet store
The remote protocol (This is the protocol of the input file urls)
The target protocol (
file
since we saved our parquet store locally).
storage_options = {
"remote_protocol": "s3",
"skip_instance_cache": True,
"remote_options": {"anon": True},
"target_protocol": "file",
"lazy": True,
} # options passed to fsspec
open_dataset_options = {"chunks": {}} # opens passed to xarray
ds = xr.open_dataset(
"combined.parq",
engine="kerchunk",
storage_options=storage_options,
open_dataset_options=open_dataset_options,
)
ds
<xarray.Dataset> Size: 140MB Dimensions: (time: 4, y: 1249, x: 999) Coordinates: * time (time) float64 32B nan 1.0 2.0 3.0 * x (x) float32 4kB -1.996e+06 -1.992e+06 ... 1.996e+06 * y (y) float32 5kB -2.496e+06 -2.492e+06 ... 2.496e+06 Data variables: HR2 (time, y, x) float32 20MB ... Lambert_Conformal (time) float32 16B ... PP (time, y, x) float32 20MB ... T2 (time, y, x) float32 20MB ... dirViento10 (time, y, x) float32 20MB ... lat (time, y, x) float32 20MB ... lon (time, y, x) float32 20MB ... magViento10 (time, y, x) float32 20MB ...