Appending to Kerchunk references

Overview

In this tutorial we’ll show how to append to a pre-existing Kerchunk reference. We’ll use the same datasets as in the NetCDF reference generation example.

Prerequisites

Concepts

Importance

Notes

Kerchunk Basics

Required

Core

Multiple Files and Kerchunk

Required

Core

Multi-File Datasets with Kerchunk

Required

IO/Visualization

  • Time to learn: 45 minutes


Imports

import logging
from tempfile import TemporaryDirectory

import dask
import fsspec
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

Create Input File List

Here we are using fsspec's glob functionality along with the * wildcard operator and some string slicing to grab a list of NetCDF files from a s3 fsspec filesystem.

# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)

files_paths = fs_read.glob(
    "s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*"
)

# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the
# `foundations/kerchunk_basics` notebook.
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")

# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name

Start a Dask Client

To parallelize the creation of our reference files, we will use Dask. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.

client = Client(n_workers=8, silence_logs=logging.ERROR)
client

Client

Client-01fd82ff-f06f-11ee-8b40-000d3ae34a06

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Create a Kerchunk reference file for the first 24 hours

first_24_hrs = file_pattern[:24]
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from
# a NetCDF file.


def generate_json_reference(fil, output_dir: str):
    with fs_read.open(fil, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
        fname = fil.split("/")[-1].strip(".nc")
        outf = f"{output_dir}/{fname}.json"
        with open(outf, "wb") as f:
            f.write(ujson.dumps(h5chunks.translate()).encode())
        return outf


# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]
# Start parallel processing
import warnings

warnings.filterwarnings("ignore")
dask.compute(tasks)
(['/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_000.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_001.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_002.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_003.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_004.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_005.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_006.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_007.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_008.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_009.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_010.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_011.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_012.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_013.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_014.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_015.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_016.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_017.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_018.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_019.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_020.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_021.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_022.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_023.json'],)

Combine .json Kerchunk reference files and write a combined Kerchunk index

In the following cell, we are combining all the .json reference files that were generated above into a single reference file and writing that file to disk.

# Create a list of reference json files
output_files = [
    f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in first_24_hrs
]

# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
    output_files,
    concat_dims=["time"],
    identical_dims=["y", "x"],
    remote_protocol="s3",
    remote_options={"anon": True},
    coo_map={"time": "cf:time"},
)
# save translate reference in memory for later visualization
multi_kerchunk = mzz.translate()

# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
    f.write(ujson.dumps(multi_kerchunk).encode())

Append references for the next 24 hours

We’ll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then, we load the original references and append the new references.

# First generate the individual reference files to be appended

second_24_hrs = file_pattern[24:48]

# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]

# Generate reference files for the individual NetCDF files
dask.compute(tasks)
(['/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_024.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_025.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_026.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_027.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_028.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_029.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_030.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_031.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_032.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_033.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_034.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_035.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_036.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_037.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_038.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_039.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_040.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_041.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_042.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_043.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_044.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_045.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_046.json',
  '/tmp/tmppd87jz2i/WRFDETAR_01H_20221231_12_047.json'],)
# Load the original references
fs_local = fsspec.filesystem("file")
archive = ujson.load(fs_local.open(output_fname))
# Create a list of individual reference files to append to the combined reference
output_files = [
    f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in second_24_hrs
]

# Append to the existing reference file
mzz = MultiZarrToZarr.append(
    output_files,
    original_refs=archive,
    concat_dims=["time"],
    identical_dims=["y", "x"],
    remote_protocol="s3",
    remote_options={"anon": True},
    coo_map={"time": "cf:time"},
)

multi_kerchunk = mzz.translate()

# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
    f.write(ujson.dumps(multi_kerchunk).encode())

Opening Reference Dataset with Fsspec and Xarray

storage_options = {
    "remote_protocol": "s3",
    "skip_instance_cache": True,
}  # options passed to fsspec
open_dataset_options = {"chunks": {}}  # opens passed to xarray

ds = xr.open_dataset(
    "ARG_combined.json",
    engine="kerchunk",
    storage_options=storage_options,
    open_dataset_options=open_dataset_options,
)

ds
<xarray.Dataset> Size: 2GB
Dimensions:            (time: 48, y: 1249, x: 999)
Coordinates:
    lat                (time, y, x) float32 240MB ...
    lon                (time, y, x) float32 240MB ...
  * time               (time) datetime64[ns] 384B 2022-12-31T12:00:00 ... 202...
  * x                  (x) float32 4kB 1.088e+09 4.585e-41 ... 4.362e-07
  * y                  (y) float32 5kB 1.088e+09 4.585e-41 ... 0.0 -2.857e-19
Data variables:
    HR2                (time, y, x) float32 240MB ...
    Lambert_Conformal  (time) float32 192B ...
    PP                 (time, y, x) float32 240MB ...
    T2                 (time, y, x) float32 240MB ...
    dirViento10        (time, y, x) float32 240MB ...
    magViento10        (time, y, x) float32 240MB ...
Attributes: (12/18)
    CEN_LAT:        -34.9999885559082
    CEN_LON:        -65.0
    Conventions:    CF-1.8
    DX:             4000.0
    DY:             4000.0
    MAP_PROJ:       1
    ...             ...
    end_lon:        -47.96868896484375
    institution:    Servicio Meteorologico Nacional
    source:          OUTPUT FROM WRF V4.0 MODEL
    start_lat:      -54.386837005615234
    start_lon:      -94.330810546875
    title:          Python PostProcessing for SMN WRF-ARW Deterministic SFC