Appending to Kerchunk references
Overview
In this tutorial we’ll show how to append to a pre-existing Kerchunk reference. We’ll use the same datasets as in the NetCDF reference generation example.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Required |
IO/Visualization |
Time to learn: 45 minutes
Imports
import logging
from tempfile import TemporaryDirectory
import dask
import fsspec
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
Create Input File List
Here we are using fsspec's
glob functionality along with the *
wildcard operator and some string slicing to grab a list of NetCDF files from a s3
fsspec
filesystem.
# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
files_paths = fs_read.glob(
"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*"
)
# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the
# `foundations/kerchunk_basics` notebook.
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")
# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
Start a Dask Client
To parallelize the creation of our reference files, we will use Dask
. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Client
Client-6308062d-b0ee-11ef-8ad4-7c1e5222ecf8
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
d190bb4a
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-8e7503d7-31f3-4fd9-b578-86c6015d8627
Comm: tcp://127.0.0.1:37533 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:45551 | Total threads: 1 |
Dashboard: http://127.0.0.1:40111/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:46343 | |
Local directory: /tmp/dask-scratch-space/worker-47ww4z1j |
Worker: 1
Comm: tcp://127.0.0.1:38611 | Total threads: 1 |
Dashboard: http://127.0.0.1:38119/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44545 | |
Local directory: /tmp/dask-scratch-space/worker-894l66ec |
Worker: 2
Comm: tcp://127.0.0.1:38495 | Total threads: 1 |
Dashboard: http://127.0.0.1:46651/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:42577 | |
Local directory: /tmp/dask-scratch-space/worker-sd2c3gi4 |
Worker: 3
Comm: tcp://127.0.0.1:33491 | Total threads: 1 |
Dashboard: http://127.0.0.1:41049/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:41091 | |
Local directory: /tmp/dask-scratch-space/worker-pc2bra4f |
Worker: 4
Comm: tcp://127.0.0.1:41563 | Total threads: 1 |
Dashboard: http://127.0.0.1:39727/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44393 | |
Local directory: /tmp/dask-scratch-space/worker-vyzh4c1f |
Worker: 5
Comm: tcp://127.0.0.1:34463 | Total threads: 1 |
Dashboard: http://127.0.0.1:41989/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:38921 | |
Local directory: /tmp/dask-scratch-space/worker-1bn6ia40 |
Worker: 6
Comm: tcp://127.0.0.1:44247 | Total threads: 1 |
Dashboard: http://127.0.0.1:36663/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44229 | |
Local directory: /tmp/dask-scratch-space/worker-hgw5975a |
Worker: 7
Comm: tcp://127.0.0.1:46213 | Total threads: 1 |
Dashboard: http://127.0.0.1:38827/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:37857 | |
Local directory: /tmp/dask-scratch-space/worker-jl03ndpv |
Create a Kerchunk
reference file for the first 24 hours
first_24_hrs = file_pattern[:24]
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from
# a NetCDF file.
def generate_json_reference(fil, output_dir: str):
with fs_read.open(fil, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
fname = fil.split("/")[-1].strip(".nc")
outf = f"{output_dir}/{fname}.json"
with open(outf, "wb") as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
return outf
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]
# Start parallel processing
import warnings
warnings.filterwarnings("ignore")
dask.compute(tasks)
(['/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_000.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_001.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_002.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_003.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_004.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_005.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_006.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_007.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_008.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_009.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_010.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_011.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_012.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_013.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_014.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_015.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_016.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_017.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_018.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_019.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_020.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_021.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_022.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_023.json'],)
Combine .json Kerchunk
reference files and write a combined Kerchunk
index
In the following cell, we are combining all the .json
reference files that were generated above into a single reference file and writing that file to disk.
# Create a list of reference json files
output_files = [
f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in first_24_hrs
]
# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
output_files,
concat_dims=["time"],
identical_dims=["y", "x"],
remote_protocol="s3",
remote_options={"anon": True},
coo_map={"time": "cf:time"},
)
# save translate reference in memory for later visualization
multi_kerchunk = mzz.translate()
# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
Append references for the next 24 hours
We’ll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then, we load the original references and append the new references.
# First generate the individual reference files to be appended
second_24_hrs = file_pattern[24:48]
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]
# Generate reference files for the individual NetCDF files
dask.compute(tasks)
(['/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_024.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_025.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_026.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_027.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_028.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_029.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_030.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_031.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_032.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_033.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_034.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_035.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_036.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_037.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_038.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_039.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_040.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_041.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_042.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_043.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_044.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_045.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_046.json',
'/tmp/tmpzmg510zx/WRFDETAR_01H_20221231_12_047.json'],)
# Load the original references
fs_local = fsspec.filesystem("file")
archive = ujson.load(fs_local.open(output_fname))
# Create a list of individual reference files to append to the combined reference
output_files = [
f"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json" for f in second_24_hrs
]
# Append to the existing reference file
mzz = MultiZarrToZarr.append(
output_files,
original_refs=archive,
concat_dims=["time"],
identical_dims=["y", "x"],
remote_protocol="s3",
remote_options={"anon": True},
coo_map={"time": "cf:time"},
)
multi_kerchunk = mzz.translate()
# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
Opening Reference Dataset with Fsspec and Xarray
storage_options = {
"remote_protocol": "s3",
"skip_instance_cache": True,
"remote_options": {"anon": True}
} # options passed to fsspec
open_dataset_options = {"chunks": {}} # opens passed to xarray
ds = xr.open_dataset(
"ARG_combined.json",
engine="kerchunk",
storage_options=storage_options,
open_dataset_options=open_dataset_options,
)
ds
<xarray.Dataset> Size: 2GB Dimensions: (time: 48, y: 1249, x: 999) Coordinates: lat (time, y, x) float32 240MB ... lon (time, y, x) float32 240MB ... * time (time) datetime64[ns] 384B 2022-12-31T12:00:00 ... 202... * x (x) float32 4kB -1.996e+06 -1.992e+06 ... 1.996e+06 * y (y) float32 5kB -2.496e+06 -2.492e+06 ... 2.496e+06 Data variables: HR2 (time, y, x) float32 240MB ... Lambert_Conformal (time) float32 192B ... PP (time, y, x) float32 240MB ... T2 (time, y, x) float32 240MB ... dirViento10 (time, y, x) float32 240MB ... magViento10 (time, y, x) float32 240MB ... Attributes: (12/18) CEN_LAT: -34.9999885559082 CEN_LON: -65.0 Conventions: CF-1.8 DX: 4000.0 DY: 4000.0 MAP_PROJ: 1 ... ... end_lon: -47.96868896484375 institution: Servicio Meteorologico Nacional source: OUTPUT FROM WRF V4.0 MODEL start_lat: -54.386837005615234 start_lon: -94.330810546875 title: Python PostProcessing for SMN WRF-ARW Deterministic SFC