Parquet Logo

Store Kerchunk Reference Files as Parquet

Overview

In this notebook we will cover how to store Kerchunk references as Parquet files instead of json. For large reference datasets, using Parquet should have performance implications as the overall reference file size should be smaller and the memory overhead of combining the reference files should be lower.

This notebook builds upon the Kerchunk Basics, Multi-File Datasets with Kerchunk and the Kerchunk and Dask notebooks.

Prerequisites

Concepts

Importance

Notes

Kerchunk Basics

Required

Core

Multiple Files and Kerchunk

Required

Core

Introduction to Xarray

Recommended

IO/Visualization

Intro to Dask

Required

Parallel Processing

  • Time to learn: 30 minutes


Imports

In addition to the previous imports we used throughout the tutorial, we are adding a few imports:

  • LazyReferenceMapper and ReferenceFileSystem from fsspec.implementations.reference for lazy Parquet.

import logging
import os

import dask
import fsspec
import xarray as xr
from distributed import Client
from fsspec.implementations.reference import LazyReferenceMapper, ReferenceFileSystem
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr

Setting up the Dask Client

client = Client(n_workers=8, silence_logs=logging.ERROR)
client

Client

Client-312a48d6-a04f-11ef-89ea-000d3ad367d5

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Create Input File List

Here we are using fsspec's glob functionality along with the * wildcard operator and some string slicing to grab a list of NetCDF files from a s3 fsspec filesystem.

# Initiate fsspec filesystems for reading
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)

files_paths = fs_read.glob("s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/*")

# Here we prepend the prefix 's3://', which points to AWS.
file_pattern = sorted(["s3://" + f for f in files_paths])

# Grab the first seven files to speed up example.
file_pattern = file_pattern[0:2]

Generate Lazy References

Below we will create a fsspec filesystem to read the references from s3 and create a function to generate dask delayed tasks.

# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk`
# index from a NetCDF file.
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")


def generate_json_reference(fil):
    with fs_read.open(fil, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)
        return h5chunks.translate()  # outf


# Generate Dask Delayed objects
tasks = [dask.delayed(generate_json_reference)(fil) for fil in file_pattern]

Start the Dask Processing

To view the processing you can view it in real-time on the Dask Dashboard. ex: http://127.0.0.1:8787/status

single_refs = dask.compute(tasks)[0]
len(single_refs)
2

Combine In-Memory References with MultiZarrToZarr

This section will look notably different than the previous examples that have written to .json.

In the following code block we are:

  • Creating an fsspec filesystem.

  • Create a empty parquet file to write to.

  • Creating an fsspec LazyReferenceMapper to pass into MultiZarrToZarr

  • Building a MultiZarrToZarr object of the combined references.

  • Calling .flush() on our LazyReferenceMapper, to write the combined reference to our parquet file.

fs = fsspec.filesystem("file")

if os.path.exists("combined.parq"):
    import shutil

    shutil.rmtree("combined.parq")
os.makedirs("combined.parq")

out = LazyReferenceMapper.create(root="combined.parq", fs=fs, record_size=1000)

mzz = MultiZarrToZarr(
    single_refs,
    remote_protocol="s3",
    concat_dims=["time"],
    identical_dims=["y", "x"],
    remote_options={"anon": True},
    out=out,
).translate()

out.flush()

Shutdown the Dask cluster

client.shutdown()

Load kerchunked dataset

Next we initiate a fsspec ReferenceFileSystem. We need to pass:

  • The name of the parquet store

  • The remote protocol (This is the protocol of the input file urls)

  • The target protocol (file since we saved our parquet store locally).

fs = ReferenceFileSystem(
    "combined.parq",
    remote_protocol="s3",
    target_protocol="file",
    lazy=True,
    remote_options={"anon": True},
)
ds = xr.open_dataset(
    fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
)
ds
<xarray.Dataset> Size: 70MB
Dimensions:            (time: 2, y: 1249, x: 999)
Coordinates:
    lat                (time, y, x) float32 10MB ...
    lon                (time, y, x) float32 10MB ...
  * time               (time) datetime64[ns] 16B 2022-12-31T12:00:00 2022-12-...
  * x                  (x) float32 4kB -1.996e+06 -1.992e+06 ... 1.996e+06
  * y                  (y) float32 5kB -2.496e+06 -2.492e+06 ... 2.496e+06
Data variables:
    HR2                (time, y, x) float32 10MB ...
    Lambert_Conformal  (time) float32 8B ...
    PP                 (time, y, x) float32 10MB ...
    T2                 (time, y, x) float32 10MB ...
    dirViento10        (time, y, x) float32 10MB ...
    magViento10        (time, y, x) float32 10MB ...
Attributes: (12/18)
    CEN_LAT:        -34.9999885559082
    CEN_LON:        -65.0
    Conventions:    CF-1.8
    DX:             4000.0
    DY:             4000.0
    MAP_PROJ:       1
    ...             ...
    end_lon:        -47.96868896484375
    institution:    Servicio Meteorologico Nacional
    source:          OUTPUT FROM WRF V4.0 MODEL
    start_lat:      -54.386837005615234
    start_lon:      -94.330810546875
    title:          Python PostProcessing for SMN WRF-ARW Deterministic SFC