NetCDF
Generating Kerchunk References from NetCDF files
Overview
Within this notebook, we will cover:
How to access remote NetCDF data using
Kerchunk
Combining multiple
Kerchunk
reference files usingMultiZarrToZarr
This notebook shares many similarities with the Multi-File Datasets with Kerchunk. If you are confused on the function of a block of code, please refer there for a more detailed breakdown of what each line is doing.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Required |
Core |
Time to learn: 45 minutes
Motivation
NetCDF4/HDF5 is one of the most universally adopted file formats in earth sciences, with support of much of the community as well as scientific agencies, data centers and university labs. A huge amount of legacy data has been generated in this format. Fortunately, using Kerchunk
, we can read these datasets as if they were an Analysis-Read Cloud-Optimized (ARCO) format such as Zarr
.
About the Dataset
For this example, we will look at a weather dataset composed of multiple NetCDF files.The SMN-Arg is a WRF deterministic weather forecasting dataset created by the Servicio Meteorológico Nacional de Argentina
that covers Argentina as well as many neighboring countries at a 4km spatial resolution.
The model is initialized twice daily at 00 & 12 UTC with hourly forecasts for variables such as temperature, relative humidity, precipitation, wind direction and magnitude etc. for multiple atmospheric levels.
The data is output at hourly intervals with a maximum prediction lead time of 72 hours in NetCDF files.
More details on this dataset can be found here.
Flags
In the section below, set the subset
flag to be True
(default) or False
depending if you want this notebook to process the full file list. If set to True
, then a subset of the file list will be processed (Recommended)
subset_flag = True
Imports
import glob
import logging
from tempfile import TemporaryDirectory
import dask
import fsspec
import s3fs
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
Examining a Single NetCDF File
Before we use Kerchunk
to create indices for multiple files, we can load a single NetCDF file to examine it.
# URL pointing to a single NetCDF file
url = "s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/00/WRFDETAR_01H_20221231_00_072.nc"
# Initialize a s3 filesystem
fs = s3fs.S3FileSystem(anon=True)
# Use Xarray to open a remote NetCDF file
ds = xr.open_dataset(fs.open(url), engine="h5netcdf")
ds
<xarray.Dataset> Size: 35MB Dimensions: (time: 1, y: 1249, x: 999) Coordinates: * time (time) datetime64[ns] 8B 2023-01-03 * x (x) float32 4kB -1.996e+06 -1.992e+06 ... 1.996e+06 * y (y) float32 5kB -2.496e+06 -2.492e+06 ... 2.496e+06 lat (y, x) float32 5MB ... lon (y, x) float32 5MB ... Data variables: PP (time, y, x) float32 5MB ... T2 (time, y, x) float32 5MB ... HR2 (time, y, x) float32 5MB ... dirViento10 (time, y, x) float32 5MB ... magViento10 (time, y, x) float32 5MB ... Lambert_Conformal float32 4B ... Attributes: (12/18) title: Python PostProcessing for SMN WRF-ARW Deterministic SFC institution: Servicio Meteorologico Nacional source: OUTPUT FROM WRF V4.0 MODEL start_lat: -54.386837 start_lon: -94.33081 end_lat: -11.645958 ... ... TRUELAT1: -35.0 TRUELAT2: -35.0 DX: 4000.0 DY: 4000.0 START_DATE: 2022-12-31_00:00:00 Conventions: CF-1.8
Here we see the repr
from the Xarray
Dataset of a single NetCDF
file. From examining the output, we can tell that the Dataset dimensions are ['time','y','x']
, with time being only a single step.
Later, when we use Kerchunk's
MultiZarrToZarr
functionality, we will need to know on which dimensions to concatenate across.
Create Input File List
Here we are using fsspec's
glob functionality along with the *
wildcard operator and some string slicing to grab a list of NetCDF files from a s3
fsspec
filesystem.
# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
files_paths = fs_read.glob("s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/*")
# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])
# If the subset_flag == True (default), the list of input files will be subset
# to speed up the processing
if subset_flag:
file_pattern = file_pattern[0:8]
# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the
# `foundations/kerchunk_basics` notebook.
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")
# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
temp_dir
'/tmp/tmpq59p3bua'
Start a Dask Client
To parallelize the creation of our reference files, we will use Dask
. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Client
Client-6abf641e-a04f-11ef-8d05-000d3ad367d5
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
b9e829a0
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-6284ea4d-5276-4562-9d22-6ce2321aadbe
Comm: tcp://127.0.0.1:34615 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:45245 | Total threads: 1 |
Dashboard: http://127.0.0.1:43151/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:37551 | |
Local directory: /tmp/dask-scratch-space/worker-4qcpg3zj |
Worker: 1
Comm: tcp://127.0.0.1:35307 | Total threads: 1 |
Dashboard: http://127.0.0.1:46007/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:42227 | |
Local directory: /tmp/dask-scratch-space/worker-t86ngvmj |
Worker: 2
Comm: tcp://127.0.0.1:40709 | Total threads: 1 |
Dashboard: http://127.0.0.1:35897/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:37413 | |
Local directory: /tmp/dask-scratch-space/worker-wha147mi |
Worker: 3
Comm: tcp://127.0.0.1:35273 | Total threads: 1 |
Dashboard: http://127.0.0.1:37999/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39719 | |
Local directory: /tmp/dask-scratch-space/worker-qnib42vm |
Worker: 4
Comm: tcp://127.0.0.1:40983 | Total threads: 1 |
Dashboard: http://127.0.0.1:34653/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:44443 | |
Local directory: /tmp/dask-scratch-space/worker-ecl2tuqm |
Worker: 5
Comm: tcp://127.0.0.1:41569 | Total threads: 1 |
Dashboard: http://127.0.0.1:40039/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:40307 | |
Local directory: /tmp/dask-scratch-space/worker-w0nki1se |
Worker: 6
Comm: tcp://127.0.0.1:45935 | Total threads: 1 |
Dashboard: http://127.0.0.1:46073/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:34567 | |
Local directory: /tmp/dask-scratch-space/worker-_5ex50he |
Worker: 7
Comm: tcp://127.0.0.1:46295 | Total threads: 1 |
Dashboard: http://127.0.0.1:38433/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:32961 | |
Local directory: /tmp/dask-scratch-space/worker-ky9r9rn1 |
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from
# a NetCDF file.
def generate_json_reference(fil, output_dir: str):
with fs_read.open(fil, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
fname = fil.split("/")[-1].strip(".nc")
outf = f"{output_dir}/{fname}.json"
with open(outf, "wb") as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
return outf
# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in file_pattern]
# Start parallel processing
import warnings
warnings.filterwarnings("ignore")
dask.compute(tasks)
(['/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_000.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_001.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_002.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_003.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_004.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_005.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_006.json',
'/tmp/tmpq59p3bua/WRFDETAR_01H_20221231_12_007.json'],)
Combine .json Kerchunk
reference files and write a combined Kerchunk
index
In the following cell, we are combining all the .json
reference files that were generated above into a single reference file and writing that file to disk.
# Create a list of reference json files
output_files = glob.glob(f"{temp_dir}/*.json")
# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
output_files,
concat_dims=["time"],
identical_dims=["y", "x"],
remote_options={"anon": True},
)
# save translate reference in memory for later visualization
multi_kerchunk = mzz.translate()
# Write kerchunk .json record.
output_fname = "ARG_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
Shut down the Dask cluster
client.shutdown()