Skip to article frontmatterSkip to article content

Appending to Kerchunk references

Overview

In this tutorial we’ll show how to append to a pre-existing Kerchunk reference. We’ll use the same datasets as in the NetCDF reference generation example.

Prerequisites

ConceptsImportanceNotes
Kerchunk BasicsRequiredCore
Multiple Files and KerchunkRequiredCore
Multi-File Datasets with KerchunkRequiredIO/Visualization
  • Time to learn: 45 minutes

Imports

import logging
from tempfile import TemporaryDirectory

import dask
import fsspec
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

Create Input File List

Here we are using fsspec's glob functionality along with the * wildcard operator and some string slicing to grab a list of NetCDF files from a s3 fsspec filesystem.

# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)

files_paths = fs_read.glob(
    "s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*"
)

# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the
# `foundations/kerchunk_basics` notebook.
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")

# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name

Start a Dask Client

To parallelize the creation of our reference files, we will use Dask. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.

client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Loading...

Create a Kerchunk reference file for the first 24 hours

first_24_hrs = file_pattern[:24]
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from
# a NetCDF file.


def generate_json_reference(fil, output_dir: str):
    with fs_read.open(fil, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
        fname = fil.split("/")[-1].strip(".nc")
        outf = f"{output_dir}/{fname}.json"
        with open(outf, "wb") as f:
            f.write(ujson.dumps(h5chunks.translate()).encode())
        return outf


# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]
# Start parallel processing
import warnings

warnings.filterwarnings("ignore")
dask.compute(tasks)
(['/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_000.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_001.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_002.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_003.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_004.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_005.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_006.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_007.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_008.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_009.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_010.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_011.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_012.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_013.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_014.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_015.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_016.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_017.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_018.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_019.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_020.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_021.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_022.json', '/tmp/tmpnihwqild/WRFDETAR_01H_20221231_12_023.json'],)

Combine .json Kerchunk reference files and write a combined Kerchunk index

In the following cell, we are combining all the .json reference files that were generated above into a single reference file and writing that file to disk.

# Create a list of reference json files
output_files = [
    f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in first_24_hrs
]

# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
    output_files,
    concat_dims=["time"],
    identical_dims=["y", "x"],
    remote_protocol="s3",
    remote_options={"anon": True},
    coo_map={"time": "cf:time"},
)
# save translate reference in memory for later visualization
multi_kerchunk = mzz.translate()

# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
    f.write(ujson.dumps(multi_kerchunk).encode())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[8], line 16
      7 mzz = MultiZarrToZarr(
      8     output_files,
      9     concat_dims=["time"],
   (...)     13     coo_map={"time": "cf:time"},
     14 )
     15 # save translate reference in memory for later visualization
---> 16 multi_kerchunk = mzz.translate()
     18 # Write kerchunk .json record.
     19 output_fname = "ARG_combined.json"

File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/site-packages/kerchunk/combine.py:647, in MultiZarrToZarr.translate(self, filename, storage_options)
    645     self.first_pass()
    646 if 2 not in self.done:
--> 647     self.store_coords()
    648 if 3 not in self.done:
    649     self.second_pass()

File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/site-packages/kerchunk/combine.py:473, in MultiZarrToZarr.store_coords(self)
    470 self.out.update(kv)
    471 logger.debug("Written coordinates")
--> 473 metadata = asyncio.run(self._read_meta_files(m, [".zgroup", ".zattrs"]))
    474 self.out.update(metadata)
    475 logger.debug("Written global metadata")

File ~/micromamba/envs/kerchunk-cookbook/lib/python3.13/asyncio/runners.py:191, in run(main, debug, loop_factory)
    161 """Execute the coroutine and return the result.
    162 
    163 This function runs the passed coroutine, taking care of
   (...)    187     asyncio.run(main())
    188 """
    189 if events._get_running_loop() is not None:
    190     # fail fast with short traceback
--> 191     raise RuntimeError(
    192         "asyncio.run() cannot be called from a running event loop")
    194 with Runner(debug=debug, loop_factory=loop_factory) as runner:
    195     return runner.run(main)

RuntimeError: asyncio.run() cannot be called from a running event loop

Append references for the next 24 hours

We’ll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then, we load the original references and append the new references.

# First generate the individual reference files to be appended

second_24_hrs = file_pattern[24:48]

# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]

# Generate reference files for the individual NetCDF files
dask.compute(tasks)
# Load the original references
fs_local = fsspec.filesystem("file")
archive = ujson.load(fs_local.open(output_fname))
# Create a list of individual reference files to append to the combined reference
output_files = [
    f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in second_24_hrs
]

# Append to the existing reference file
mzz = MultiZarrToZarr.append(
    output_files,
    original_refs=archive,
    concat_dims=["time"],
    identical_dims=["y", "x"],
    remote_protocol="s3",
    remote_options={"anon": True},
    coo_map={"time": "cf:time"},
)

multi_kerchunk = mzz.translate()

# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
    f.write(ujson.dumps(multi_kerchunk).encode())

Opening Reference Dataset with Fsspec and Xarray

storage_options = {
    "remote_protocol": "s3",
    "skip_instance_cache": True,
    "remote_options": {"anon": True}
}  # options passed to fsspec
open_dataset_options = {"chunks": {}}  # opens passed to xarray

ds = xr.open_dataset(
    "ARG_combined.json",
    engine="kerchunk",
    storage_options=storage_options,
    open_dataset_options=open_dataset_options,
)
ds