GRIB2
Generating Kerchunk References from GRIB2 files
Overview
Within this notebook, we will cover:
Generating a list of GRIB2 files on a remote filesystem using
fsspec
How to create reference files of GRIB2 files using
Kerchunk
Combining multiple
Kerchunk
reference files usingMultiZarrToZarr
This notebook shares many similarities with the Multi-File Datasets with Kerchunk and the NetCDF/HDF5 Argentinian Weather Dataset Case Study, however this case studies examines another data format and uses kerchunk.scan_grib
to create reference files.
This notebook borrows heavily from this GIST created by Peter Marsh.
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Required |
Core |
|
Required |
Core |
|
Required |
Core |
|
Required |
IO/Visualization |
Time to learn: 45 minutes
Motivation
Kerchunk
supports multiple input file formats. One of these is GRIB2(GRIdded Information in Binary form)
, which is a binary file format primary used in meteorology and weather datasets. Similar to NetCDF/HDF5, GRIB2 does not support efficient, parallel access. Using Kerchunk
, we can read this legacy format as if it were an ARCO (Analysis-Ready, Cloud-Optimized) data format such as Zarr.
About the Dataset
The HRRR
is a NOAA real-time 3-km resolution, hourly updated, cloud-resolving, convection-allowing atmospheric model, initialized by 3km grids with 3km radar assimilation. Radar data is assimilated in the HRRR every 15 min over a 1-h period adding further detail to that provided by the hourly data assimilation from the 13km radar-enhanced Rapid Refresh.
NOAA releases a copy of this dataset via the AWS Registry of Open Data.
Flags
In the section below, set the subset
flag to be True
(default) or False
depending if you want this notebook to process the full file list. If set to True
, then a subset of the file list will be processed (Recommended)
subset_flag = True
Imports
import glob
import logging
from tempfile import TemporaryDirectory
import dask
import fsspec
import ujson
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.grib2 import scan_grib
Create Input File List
Here we create fsspec
files-systems for reading remote files and writing local reference files.
Next we are using fsspec.glob
to retrieve a list of file paths and appending the s3://
prefix to them.
# Initiate fsspec filesystems for reading and writing
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
# retrieve list of available days in archive
days_available = fs_read.glob("s3://noaa-hrrr-bdp-pds/hrrr.*")
# Read HRRR GRIB2 files from April 19, 2023
files = fs_read.glob("s3://noaa-hrrr-bdp-pds/hrrr.20230419/conus/*wrfsfcf01.grib2")
# Append s3 prefix for filelist
files = sorted(["s3://" + f for f in files])
# If the subset_flag == True (default), the list of input files will be subset to
# speed up the processing
if subset_flag:
files = files[0:2]
Start a Dask Client
To parallelize the creation of our reference files, we will use Dask
. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: Kerchunk and Dask.
client = Client(n_workers=8, silence_logs=logging.ERROR)
client
Client
Client-55b6e1bd-a04f-11ef-8be3-000d3ad367d5
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
d38e0497
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 8 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d7b65dd5-8850-42b6-bac5-d594d7e3bb21
Comm: tcp://127.0.0.1:34549 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:42891 | Total threads: 1 |
Dashboard: http://127.0.0.1:35649/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:35579 | |
Local directory: /tmp/dask-scratch-space/worker-_9lb23ui |
Worker: 1
Comm: tcp://127.0.0.1:46755 | Total threads: 1 |
Dashboard: http://127.0.0.1:39055/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:42181 | |
Local directory: /tmp/dask-scratch-space/worker-trwx1yov |
Worker: 2
Comm: tcp://127.0.0.1:41397 | Total threads: 1 |
Dashboard: http://127.0.0.1:41833/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:46115 | |
Local directory: /tmp/dask-scratch-space/worker-pbwglc4j |
Worker: 3
Comm: tcp://127.0.0.1:37161 | Total threads: 1 |
Dashboard: http://127.0.0.1:34593/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:45337 | |
Local directory: /tmp/dask-scratch-space/worker-gyx40x28 |
Worker: 4
Comm: tcp://127.0.0.1:41157 | Total threads: 1 |
Dashboard: http://127.0.0.1:42613/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:34045 | |
Local directory: /tmp/dask-scratch-space/worker-fi7trgw9 |
Worker: 5
Comm: tcp://127.0.0.1:45953 | Total threads: 1 |
Dashboard: http://127.0.0.1:42815/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:46091 | |
Local directory: /tmp/dask-scratch-space/worker-uiivq5us |
Worker: 6
Comm: tcp://127.0.0.1:46149 | Total threads: 1 |
Dashboard: http://127.0.0.1:33487/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:39267 | |
Local directory: /tmp/dask-scratch-space/worker-lx5mdp_x |
Worker: 7
Comm: tcp://127.0.0.1:45973 | Total threads: 1 |
Dashboard: http://127.0.0.1:45787/status | Memory: 1.95 GiB |
Nanny: tcp://127.0.0.1:35729 | |
Local directory: /tmp/dask-scratch-space/worker-y0qqvwgo |
Iterate through list of files and create Kerchunk
indices as .json
reference files
Each input GRIB2 file contains multiple “messages”, each a measure of some variable on a grid, but with grid dimensions not necessarily compatible with one-another. The filter we create in the first line selects only certain types of messages, and indicated that heightAboveGround will be a coordinate of interest.
We also write a separate JSON for each of the selected message, since these are the basic component data sets (see the loop over out
).
Note: scan_grib
does not require a filter and will happily create a reference file for each available grib message. However when combining the grib messages using MultiZarrToZarr
it is necessary for the messages to share a coordinate system. Thus to make our lives easier and ensure all reference outputs from scan_grib
share a coordinate system we pass a filter argument.
afilter = {"typeOfLevel": "heightAboveGround", "level": [2, 10]}
so = {"anon": True}
# We are creating a temporary directory to store the .json reference files
# Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name
temp_dir
'/tmp/tmp5bn7xe11'
def make_json_name(
file_url, message_number
): # create a unique name for each reference file
date = file_url.split("/")[3].split(".")[1]
name = file_url.split("/")[5].split(".")[1:3]
return f"{temp_dir}/{date}_{name[0]}_{name[1]}_message{message_number}.json"
def gen_json(file_url):
out = scan_grib(
file_url, storage_options=so, inline_threshold=100, filter=afilter
) # create the reference using scan_grib
for i, message in enumerate(
out
): # scan_grib outputs a list containing one reference per grib message
out_file_name = make_json_name(file_url, i) # get name
with open(out_file_name, "w") as f:
f.write(ujson.dumps(message)) # write to file
# Generate Dask Delayed objects
tasks = [dask.delayed(gen_json)(fil) for fil in files]
# Start parallel processing
import warnings
warnings.filterwarnings("ignore")
dask.compute(tasks)
([None, None],)
Combine Kerchunk
reference .json
files
We know that four coordinates are identical for every one of our component datasets - they are not functions of valid_time.
# Create a list of reference json files
output_files = glob.glob(f"{temp_dir}/*.json")
# Combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
output_files,
concat_dims=["valid_time"],
identical_dims=["latitude", "longitude", "heightAboveGround", "step"],
)
multi_kerchunk = mzz.translate()
Write combined Kerchunk
reference file to .json
# Write Kerchunk .json record
output_fname = "HRRR_combined.json"
with open(f"{output_fname}", "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
Shut down the Dask cluster
client.shutdown()