Store Kerchunk Reference Files as Parquet
Overview
In this notebook we will cover how to store Kerchunk references as Parquet files instead of json. For large reference datasets, using Parquet should have performance implications as the overall reference file size should be smaller and the memory overhead of combining the reference files should be lower.
This notebook builds upon the Kerchunk Basics, Multi-File Datasets with Kerchunk and the Kerchunk and Dask notebooks.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Recommended |
IO/Visualization |
|
Required |
Parallel Processing |
Time to learn: 30 minutes
Imports
In addition to the previous imports we used throughout the tutorial, we are adding a few imports:
LazyReferenceMapper
andReferenceFileSystem
fromfsspec.implementations.reference
for lazy Parquet.
import logging
import os
import dask
import fsspec
import xarray as xr
from distributed import Client
from fsspec.implementations.reference import LazyReferenceMapper, ReferenceFileSystem
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
Setting up the Dask
Client
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Client
Client-312a48d6-a04f-11ef-89ea-000d3ad367d5
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
011d2b7a
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d04a3d13-02ec-4c46-a6b4-9b65761ac7e6
Comm: tcp://127.0.0.1:39295 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:37427 | Total threads: 1 |
Dashboard: http://127.0.0.1:41111/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:35185 | |
Local directory: /tmp/dask-scratch-space/worker-3zudk4yk |
Worker: 1
Comm: tcp://127.0.0.1:37233 | Total threads: 1 |
Dashboard: http://127.0.0.1:45791/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:43519 | |
Local directory: /tmp/dask-scratch-space/worker-mzsxzva3 |
Worker: 2
Comm: tcp://127.0.0.1:36225 | Total threads: 1 |
Dashboard: http://127.0.0.1:33147/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:35873 | |
Local directory: /tmp/dask-scratch-space/worker-cy3fa315 |
Worker: 3
Comm: tcp://127.0.0.1:36733 | Total threads: 1 |
Dashboard: http://127.0.0.1:45855/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:38655 | |
Local directory: /tmp/dask-scratch-space/worker-zep9i6pd |
Worker: 4
Comm: tcp://127.0.0.1:40113 | Total threads: 1 |
Dashboard: http://127.0.0.1:37609/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:43391 | |
Local directory: /tmp/dask-scratch-space/worker-r8do56r9 |
Worker: 5
Comm: tcp://127.0.0.1:32875 | Total threads: 1 |
Dashboard: http://127.0.0.1:32821/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39485 | |
Local directory: /tmp/dask-scratch-space/worker-m1e9msse |
Worker: 6
Comm: tcp://127.0.0.1:37501 | Total threads: 1 |
Dashboard: http://127.0.0.1:35133/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:34203 | |
Local directory: /tmp/dask-scratch-space/worker-wa7gqr8i |
Worker: 7
Comm: tcp://127.0.0.1:39641 | Total threads: 1 |
Dashboard: http://127.0.0.1:41695/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:33941 | |
Local directory: /tmp/dask-scratch-space/worker-bo_zppoh |
Create Input File List
Here we are using fsspec's
glob functionality along with the *
wildcard operator and some string slicing to grab a list of NetCDF files from a s3
fsspec
filesystem.
# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
files_paths = fs_read.glob("s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/*")
# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# Grab the first seven files to speed up example.
file_pattern = file_pattern[0:2]
Generate Lazy References
Below we will create a fsspec
filesystem to read the references from s3
and create a function to generate dask delayed tasks.
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk`
# index from a NetCDF file.
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")
def generate_json_reference(fil):
with fs_read.open(fil, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
return h5chunks.translate() # outf
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil) for fil in file_pattern]
Start the Dask Processing
To view the processing you can view it in real-time on the Dask Dashboard. ex: http://127.0.0.1:8787/status
single_refs = dask.compute(tasks)[0]
len(single_refs)
2
Combine In-Memory References with MultiZarrToZarr
This section will look notably different than the previous examples that have written to .json
.
In the following code block we are:
Creating an
fsspec
filesystem.Create a empty
parquet
file to write to.Creating an
fsspec
LazyReferenceMapper
to pass intoMultiZarrToZarr
Building a
MultiZarrToZarr
object of the combined references.Calling
.flush()
on our LazyReferenceMapper, to write the combined reference to ourparquet
file.
fs = fsspec.filesystem("file")
if os.path.exists("combined.parq"):
import shutil
shutil.rmtree("combined.parq")
os.makedirs("combined.parq")
out = LazyReferenceMapper.create(root="combined.parq", fs=fs, record_size=1000)
mzz = MultiZarrToZarr(
single_refs,
remote_protocol="s3",
concat_dims=["time"],
identical_dims=["y", "x"],
remote_options={"anon": True},
out=out,
).translate()
out.flush()
Shutdown the Dask cluster
client.shutdown()
Load kerchunked dataset
Next we initiate a fsspec
ReferenceFileSystem
.
We need to pass:
The name of the parquet store
The remote protocol (This is the protocol of the input file urls)
The target protocol (
file
since we saved our parquet store locally).
fs = ReferenceFileSystem(
"combined.parq",
remote_protocol="s3",
target_protocol="file",
lazy=True,
remote_options={"anon": True},
)
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
)
ds
<xarray.Dataset> Size: 70MB Dimensions: (time: 2, y: 1249, x: 999) Coordinates: lat (time, y, x) float32 10MB ... lon (time, y, x) float32 10MB ... * time (time) datetime64[ns] 16B 2022-12-31T12:00:00 2022-12-... * x (x) float32 4kB -1.996e+06 -1.992e+06 ... 1.996e+06 * y (y) float32 5kB -2.496e+06 -2.492e+06 ... 2.496e+06 Data variables: HR2 (time, y, x) float32 10MB ... Lambert_Conformal (time) float32 8B ... PP (time, y, x) float32 10MB ... T2 (time, y, x) float32 10MB ... dirViento10 (time, y, x) float32 10MB ... magViento10 (time, y, x) float32 10MB ... Attributes: (12/18) CEN_LAT: -34.9999885559082 CEN_LON: -65.0 Conventions: CF-1.8 DX: 4000.0 DY: 4000.0 MAP_PROJ: 1 ... ... end_lon: -47.96868896484375 institution: Servicio Meteorologico Nacional source: OUTPUT FROM WRF V4.0 MODEL start_lat: -54.386837005615234 start_lon: -94.330810546875 title: Python PostProcessing for SMN WRF-ARW Deterministic SFC