Overview¶
In this tutorial we’ll show how to append to a pre-existing Kerchunk reference. We’ll use the same datasets as in the NetCDF reference generation example.
Prerequisites¶
Concepts | Importance | Notes |
---|---|---|
Kerchunk Basics | Required | Core |
Multiple Files and Kerchunk | Required | Core |
Multi-File Datasets with Kerchunk | Required | IO/Visualization |
- Time to learn: 45 minutes
Imports¶
import logging
from tempfile import TemporaryDirectory
import dask
import fsspec
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
Create Input File List¶
Here we are using fsspec's
glob functionality along with the *
wildcard operator and some string slicing to grab a list of NetCDF files from a s3
fsspec
filesystem.
# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
files_paths = fs_read.glob(
"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*"
)
# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the
# `foundations/kerchunk_basics` notebook.
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")
# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
Start a Dask Client¶
To parallelize the creation of our reference files, we will use Dask
. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Create a Kerchunk
reference file for the first 24 hours¶
first_24_hrs = file_pattern[:24]
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from
# a NetCDF file.
def generate_json_reference(fil, output_dir: str):
with fs_read.open(fil, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
fname = fil.split("/")[-1].strip(".nc")
outf = f"{output_dir}/{fname}.json"
with open(outf, "wb") as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
return outf
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]
# Start parallel processing
import warnings
warnings.filterwarnings("ignore")
dask.compute(tasks)
(['/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_000.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_001.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_002.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_003.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_004.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_005.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_006.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_007.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_008.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_009.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_010.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_011.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_012.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_013.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_014.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_015.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_016.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_017.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_018.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_019.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_020.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_021.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_022.json',
'/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_023.json'],)
Combine .json Kerchunk
reference files and write a combined Kerchunk
index¶
In the following cell, we are combining all the .json
reference files that were generated above into a single reference file and writing that file to disk.
# Create a list of reference json files
output_files = [
f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in first_24_hrs
]
# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
output_files,
concat_dims=["time"],
identical_dims=["y", "x"],
remote_protocol="s3",
remote_options={"anon": True},
coo_map={"time": "cf:time"},
)
# save translate reference in memory for later visualization
multi_kerchunk = mzz.translate()
# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[8], line 16
7 mzz = MultiZarrToZarr(
8 output_files,
9 concat_dims=["time"],
(...) 13 coo_map={"time": "cf:time"},
14 )
15 # save translate reference in memory for later visualization
---> 16 multi_kerchunk = mzz.translate()
18 # Write kerchunk .json record.
19 output_fname = "ARG_combined.json"
File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/site-packages/kerchunk/combine.py:647, in MultiZarrToZarr.translate(self, filename, storage_options)
645 self.first_pass()
646 if 2 not in self.done:
--> 647 self.store_coords()
648 if 3 not in self.done:
649 self.second_pass()
File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/site-packages/kerchunk/combine.py:473, in MultiZarrToZarr.store_coords(self)
470 self.out.update(kv)
471 logger.debug("Written coordinates")
--> 473 metadata = asyncio.run(self._read_meta_files(m, [".zgroup", ".zattrs"]))
474 self.out.update(metadata)
475 logger.debug("Written global metadata")
File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/asyncio/runners.py:191, in run(main, debug, loop_factory)
161 """Execute the coroutine and return the result.
162
163 This function runs the passed coroutine, taking care of
(...) 187 asyncio.run(main())
188 """
189 if events._get_running_loop() is not None:
190 # fail fast with short traceback
--> 191 raise RuntimeError(
192 "asyncio.run() cannot be called from a running event loop")
194 with Runner(debug=debug, loop_factory=loop_factory) as runner:
195 return runner.run(main)
RuntimeError: asyncio.run() cannot be called from a running event loop
Append references for the next 24 hours¶
We’ll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then, we load the original references and append the new references.
# First generate the individual reference files to be appended
second_24_hrs = file_pattern[24:48]
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]
# Generate reference files for the individual NetCDF files
dask.compute(tasks)
# Load the original references
fs_local = fsspec.filesystem("file")
archive = ujson.load(fs_local.open(output_fname))
# Create a list of individual reference files to append to the combined reference
output_files = [
f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in second_24_hrs
]
# Append to the existing reference file
mzz = MultiZarrToZarr.append(
output_files,
original_refs=archive,
concat_dims=["time"],
identical_dims=["y", "x"],
remote_protocol="s3",
remote_options={"anon": True},
coo_map={"time": "cf:time"},
)
multi_kerchunk = mzz.translate()
# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
Opening Reference Dataset with Fsspec and Xarray¶
storage_options = {
"remote_protocol": "s3",
"skip_instance_cache": True,
"remote_options": {"anon": True}
} # options passed to fsspec
open_dataset_options = {"chunks": {}} # opens passed to xarray
ds = xr.open_dataset(
"ARG_combined.json",
engine="kerchunk",
storage_options=storage_options,
open_dataset_options=open_dataset_options,
)
ds