Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Reproducing Key Figures from Kay et al. (2015)


Overview

This notebook demonstrates how one might use the NCAR Community Earth System Model (CESM) Large Ensemble (LENS) data hosted on AWS S3. The notebook shows how to reproduce figures 2 and 4 from the Kay et al. (2015) paper describing the CESM LENS dataset Kay et al., 2015.

This resource is intended to be helpful for people not familiar with elements of the Pangeo framework including Jupyter Notebooks, Xarray, and Zarr data format, or with the original paper, so it includes additional explanation.

Prerequisites

ConceptsImportanceNotes
Intro to XarrayNecessary
DaskHelpful
  • Time to learn: 30 minutes


NOTE: In this notebook, we access very large cloud-served datasets and use Dask to parallelize our workflow. The end-to-end execution time may be on the order of an hour or more, depending on your computing resources.

Imports

import sys
import warnings
warnings.filterwarnings("ignore")

import intake
import matplotlib.pyplot as plt
from dask.distributed import Client
import numpy as np
import pandas as pd
import xarray as xr
import cmaps  # for NCL colormaps
import cartopy.crs as ccrs
import dask
import s3fs
dask.config.set({"distributed.scheduler.worker-saturation": 1.0})
<dask.config.set at 0x7f903404df10>

Create and Connect to Dask Distributed Cluster

Here we’ll use a dask cluster to parallelize our analysis.

platform = sys.platform

if (platform == 'win32'):
    import multiprocessing.popen_spawn_win32
else:
    import multiprocessing.popen_spawn_posix
client = Client()
client
Loading...

Load and Prepare Data

catalog_url = 'https://ncar-cesm-lens.s3-us-west-2.amazonaws.com/catalogs/aws-cesm1-le.json'
col = intake.open_esm_datastore(catalog_url)
col
Loading...

Show the first few lines of the catalog:

col.df.head(10)
Loading...

Show expanded version of collection structure with details:

col.keys_info().head()
Loading...

Extract data needed to construct Figure 2

Search the catalog to find the desired data, in this case the reference height temperature of the atmosphere, at daily time resolution, for the Historical, 20th Century, and RCP8.5 (IPCC Representative Concentration Pathway 8.5) experiments.

col_subset = col.search(frequency=["daily", "monthly"], component="atm", variable="TREFHT",
                        experiment=["20C", "RCP85", "HIST"])

col_subset
Loading...
col_subset.df
Loading...

Load catalog entries for subset into a dictionary of Xarray Datasets:

dsets = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": True}, storage_options={"anon": True})
print(f"\nDataset dictionary keys:\n {dsets.keys()}")

--> The keys in the returned dictionary of datasets are constructed as follows:
	'component.experiment.frequency'
Loading...
Loading...

Dataset dictionary keys:
 dict_keys(['atm.RCP85.monthly', 'atm.HIST.monthly', 'atm.HIST.daily', 'atm.RCP85.daily', 'atm.20C.monthly', 'atm.20C.daily'])

Define Xarray Datasets corresponding to the three experiments:

ds_HIST = dsets['atm.HIST.monthly']
ds_20C = dsets['atm.20C.daily']
ds_RCP85 = dsets['atm.RCP85.daily']

Use the dask.distributed utility function to display size of each dataset:

from dask.utils import format_bytes
print(f"Historical: {format_bytes(ds_HIST.nbytes)}\n"
      f"20th Century: {format_bytes(ds_20C.nbytes)}\n"
      f"RCP8.5: {format_bytes(ds_RCP85.nbytes)}")
Historical: 177.21 MiB
20th Century: 258.65 GiB
RCP8.5: 285.71 GiB

Now, extract the Reference Height Temperature data variable:

t_hist = ds_HIST["TREFHT"]
t_20c = ds_20C["TREFHT"]
t_rcp = ds_RCP85["TREFHT"]
t_20c
Loading...

The global surface temperature anomaly was computed relative to the 1961-90 base period in the Kay et al. paper, so extract that time slice:

t_ref = t_20c.sel(time=slice("1961", "1990"))

Figure 2

Read grid cell areas

Cell size varies with latitude, so this must be accounted for when computing the global mean.

cat = col.search(frequency="static", component="atm", experiment=["20C"])
_, grid = cat.to_dataset_dict(aggregate=False, storage_options={'anon':True}, zarr_kwargs={"consolidated": True}).popitem()
grid

--> The keys in the returned dictionary of datasets are constructed as follows:
	'variable.long_name.component.experiment.frequency.vertical_levels.spatial_domain.units.start_time.end_time.path'
Loading...
Loading...
cell_area = grid.area.load()
total_area = cell_area.sum()
cell_area
Loading...

Define weighted means

Note: resample(time="AS") does an annual resampling based on start of calendar year. See documentation for Pandas resampling options.

t_ref_ts = (
    (t_ref.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
    / total_area
).mean(dim=("time", "member_id"))

t_hist_ts = (
    (t_hist.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_area

t_20c_ts = (
    (t_20c.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_area

t_rcp_ts = (
    (t_rcp.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_area

Read data and compute means

Dask’s “lazy execution” philosophy means that until this point we have not actually read the bulk of the data. Steps 1, 3, and 4 take a while to complete, so we include the Notebook “cell magic” directive %%time to display elapsed and CPU times after computation.

Step 1 (takes a while)

%%time
# this cell takes a while, be patient
t_ref_mean = t_ref_ts.load()
t_ref_mean
2026-06-07 04:20:05,503 - distributed.worker - ERROR - Compute Failed
Key:       ('open_dataset-TREFHT-757677e046c6059c913a7f48c3634800', 0, 44, 0, 0)
State:     executing
Task:  <Task ('open_dataset-TREFHT-757677e046c6059c913a7f48c3634800', 0, 44, 0, 0) getter(...)>
Exception: 'ClientPayloadError("Response payload is not completed: <ContentLengthError: 400, message=\'Not enough data to satisfy content length header.\'>. ConnectionResetError(104, \'Connection reset by peer\')")'
Traceback: '  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/dask/array/core.py", line 140, in getter\n    c = np.asarray(c)\n        ^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 686, in __array__\n    return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)\n                      ^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 691, in get_duck_array\n    return self.array.get_duck_array()\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 924, in get_duck_array\n    return self.array.get_duck_array()\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 764, in get_duck_array\n    array = self.array[self.key]\n            ~~~~~~~~~~^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 316, in __getitem__\n    return indexing.explicit_indexing_adapter(\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 1156, in explicit_indexing_adapter\n    result = raw_indexing_method(raw_key.tuple)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 279, in _getitem\n    return self._array[key]\n           ~~~~~~~~~~~^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 2832, in __getitem__\n    return self.get_orthogonal_selection(pure_selection, fields=fields)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 3303, in get_orthogonal_selection\n    return sync(\n           ^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py", line 159, in sync\n    raise return_result\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py", line 119, in _runner\n    return await coro\n           ^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 1584, in _get_selection\n    return await _get_selection(\n           ^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 5616, in _get_selection\n    await codec_pipeline.read(\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py", line 475, in read\n    await concurrent_map(\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 117, in concurrent_map\n    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 115, in run\n    return await func(*item)\n           ^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py", line 272, in read_batch\n    chunk_bytes_batch = await concurrent_map(\n                        ^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 117, in concurrent_map\n    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 115, in run\n    return await func(*item)\n           ^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_common.py", line 174, in get\n    return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_fsspec.py", line 289, in get\n    value = prototype.buffer.from_bytes(await self.fs._cat_file(path))\n                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1322, in _cat_file\n    return await self._cat_file_concurrent(\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1363, in _cat_file_concurrent\n    results = await asyncio.gather(\n              ^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1354, in _read_chunk\n    data = await resp["Body"].read()\n           ^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiobotocore/response.py", line 59, in read\n    chunk = await self.__wrapped__.content.read(\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 446, in read\n    block = await self.readany()\n            ^^^^^^^^^^^^^^^^^^^^\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 468, in readany\n    await self._wait("readany")\n  File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 372, in _wait\n    await waiter\n'

CPU times: user 14.2 s, sys: 2.87 s, total: 17.1 s
Wall time: 3min 5s
---------------------------------------------------------------------------
ContentLengthError                        Traceback (most recent call last)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/client_proto.py:144, in connection_lost()
    143 try:
--> 144     uncompleted = self._parser.feed_eof()
    145 except Exception as underlying_exc:

File aiohttp/_http_parser.pyx:546
--> 546     def feed_eof(self):
    547         cdef bytes desc

ContentLengthError: 400, message:
  Not enough data to satisfy content length header.

The above exception was the direct cause of the following exception:

ClientPayloadError                        Traceback (most recent call last)
Cell In[18], line 1
----> 1 get_ipython().run_cell_magic('time', '', '# this cell takes a while, be patient\nt_ref_mean = t_ref_ts.load()\nt_ref_mean\n')

File <timed exec>:2
      1 'Could not get source, probably due dynamically evaluated source code.'

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/dataarray.py:1173, in DataArray.load(self, **kwargs)
   1143 def load(self, **kwargs) -> Self:
   1144     """Trigger loading data into memory and return this dataarray.
   1145 
   1146     Data will be computed and/or loaded from disk or a remote source.
   (...)   1171     Variable.load
   1172     """
-> 1173     ds = self._to_temp_dataset().load(**kwargs)
   1174     new = self._from_temp_dataset(ds)
   1175     self._variable = new._variable

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/dataset.py:569, in Dataset.load(self, **kwargs)
    565         if chunked_data:
    566             chunkmanager = get_chunked_array_type(*chunked_data.values())
    567 
    568             # evaluate all the chunked arrays simultaneously
--> 569             evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    570                 *chunked_data.values(), **kwargs
    571             )
    572 

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
     80 def compute(
     81     self, *data: Any, **kwargs: Any
     82 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     83     from dask.array import compute
---> 85     return compute(*data, **kwargs)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/dask/base.py:685, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    682     expr = expr.optimize()
    683     keys = list(flatten(expr.__dask_keys__()))
--> 685     results = schedule(expr, keys, **kwargs)
    687 return repack(results)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:686, in __array__()
    682 def __array__(
    683     self, dtype: DTypeLike | None = None, /, *, copy: bool | None = None
    684 ) -> np.ndarray:
    685     if Version(np.__version__) >= Version("2.0.0"):
--> 686         return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)
    687     else:
    688         return np.asarray(self.get_duck_array(), dtype=dtype)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:691, in get_duck_array()
    690 def get_duck_array(self):
--> 691     return self.array.get_duck_array()

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:924, in get_duck_array()
    923 def get_duck_array(self):
--> 924     return self.array.get_duck_array()

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:764, in get_duck_array()
    761 from xarray.backends.common import BackendArray
    763 if isinstance(self.array, BackendArray):
--> 764     array = self.array[self.key]
    765 else:
    766     array = apply_indexer(self.array, self.key)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py:316, in __getitem__()
    314 elif isinstance(key, indexing.OuterIndexer):
    315     method = self._oindex
--> 316 return indexing.explicit_indexing_adapter(
    317     key, array.shape, indexing.IndexingSupport.VECTORIZED, method
    318 )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:1156, in explicit_indexing_adapter()
   1134 """Support explicit indexing by delegating to a raw indexing method.
   1135 
   1136 Outer and/or vectorized indexers are supported by indexing a second time
   (...)   1153 Indexing result, in the form of a duck numpy-array.
   1154 """
   1155 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1156 result = raw_indexing_method(raw_key.tuple)
   1157 if numpy_indices.tuple:
   1158     # index the loaded duck array
   1159     indexable = as_indexable(result)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py:279, in _getitem()
    278 def _getitem(self, key):
--> 279     return self._array[key]

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:2832, in __getitem__()
   2830     return self.vindex[cast("CoordinateSelection | MaskSelection", selection)]
   2831 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
-> 2832     return self.get_orthogonal_selection(pure_selection, fields=fields)
   2833 else:
   2834     return self.get_basic_selection(cast("BasicSelection", pure_selection), fields=fields)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:3303, in get_orthogonal_selection()
   3301     prototype = default_buffer_prototype()
   3302 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 3303 return sync(
   3304     self.async_array._get_selection(
   3305         indexer=indexer, out=out, fields=fields, prototype=prototype
   3306     )
   3307 )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync()
    156 return_result = next(iter(finished)).result()
    158 if isinstance(return_result, BaseException):
--> 159     raise return_result
    160 else:
    161     return return_result

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner()
    114 """
    115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
    116 exception, the exception will be returned.
    117 """
    118 try:
--> 119     return await coro
    120 except Exception as ex:
    121     return ex

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:1584, in _get_selection()
   1576 async def _get_selection(
   1577     self,
   1578     indexer: Indexer,
   (...)   1582     fields: Fields | None = None,
   1583 ) -> NDArrayLikeOrScalar:
-> 1584     return await _get_selection(
   1585         self.store_path,
   1586         self.metadata,
   1587         self.codec_pipeline,
   1588         self.config,
   1589         indexer,
   1590         prototype=prototype,
   1591         out=out,
   1592         fields=fields,
   1593     )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:5616, in _get_selection()
   5613         _config = replace(_config, order=order)
   5615     # reading chunks and decoding them
-> 5616     await codec_pipeline.read(
   5617         [
   5618             (
   5619                 store_path / metadata.encode_chunk_key(chunk_coords),
   5620                 metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype),
   5621                 chunk_selection,
   5622                 out_selection,
   5623                 is_complete_chunk,
   5624             )
   5625             for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer
   5626         ],
   5627         out_buffer,
   5628         drop_axes=indexer.drop_axes,
   5629     )
   5630 if isinstance(indexer, BasicIndexer) and indexer.shape == ():
   5631     return out_buffer.as_scalar()

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:475, in read()
    469 async def read(
    470     self,
    471     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
    472     out: NDBuffer,
    473     drop_axes: tuple[int, ...] = (),
    474 ) -> None:
--> 475     await concurrent_map(
    476         [
    477             (single_batch_info, out, drop_axes)
    478             for single_batch_info in batched(batch_info, self.batch_size)
    479         ],
    480         self.read_batch,
    481         config.get("async.concurrency"),
    482     )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:117, in concurrent_map()
    114     async with sem:
    115         return await func(*item)
--> 117 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:115, in run()
    113 async def run(item: tuple[Any]) -> V:
    114     async with sem:
--> 115         return await func(*item)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:272, in read_batch()
    270             out[out_selection] = fill_value_or_default(chunk_spec)
    271 else:
--> 272     chunk_bytes_batch = await concurrent_map(
    273         [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
    274         lambda byte_getter, prototype: byte_getter.get(prototype),
    275         config.get("async.concurrency"),
    276     )
    277     chunk_array_batch = await self.decode_batch(
    278         [
    279             (chunk_bytes, chunk_spec)
   (...)    283         ],
    284     )
    285     for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip(
    286         chunk_array_batch, batch_info, strict=False
    287     ):

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:117, in concurrent_map()
    114     async with sem:
    115         return await func(*item)
--> 117 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:115, in run()
    113 async def run(item: tuple[Any]) -> V:
    114     async with sem:
--> 115         return await func(*item)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_common.py:174, in get()
    172 if prototype is None:
    173     prototype = default_buffer_prototype()
--> 174 return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_fsspec.py:289, in get()
    287 try:
    288     if byte_range is None:
--> 289         value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
    290     elif isinstance(byte_range, RangeByteRequest):
    291         value = prototype.buffer.from_bytes(
    292             await self.fs._cat_file(
    293                 path,
   (...)    296             )
    297         )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1322, in _cat_file()
   1319     resp["Body"].close()
   1321     if content_length and content_length > chunksize:
-> 1322         return await self._cat_file_concurrent(
   1323             bucket,
   1324             key,
   1325             content_length,
   1326             chunksize,
   1327             max_concurrency=max_concurrency,
   1328             version_id=version_id or vers,
   1329         )
   1331 return await _error_wrapper(_call_and_read, retries=self.retries)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1363, in _cat_file_concurrent()
   1361 buf = bytearray(content_length)
   1362 for batch_start, batch_stop in zip(inds[:-1], inds[1:]):
-> 1363     results = await asyncio.gather(
   1364         *[
   1365             _read_chunk(start, end)
   1366             for start, end in ranges[batch_start:batch_stop]
   1367         ]
   1368     )
   1369     for offset, data in results:
   1370         buf[offset : offset + len(data)] = data

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1354, in _read_chunk()
   1346 kw["Range"] = f"bytes={start}-{end}"
   1347 resp = await self._call_s3(
   1348     "get_object",
   1349     Bucket=bucket,
   (...)   1352     **kw,
   1353 )
-> 1354 data = await resp["Body"].read()
   1355 resp["Body"].close()
   1356 return start, data

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiobotocore/response.py:59, in read()
     57 # botocore to aiohttp mapping
     58 try:
---> 59     chunk = await self.__wrapped__.content.read(
     60         amt if amt is not None else -1
     61     )
     62 except asyncio.TimeoutError as e:
     63     raise AioReadTimeoutError(
     64         endpoint_url=self.__wrapped__.url, error=e
     65     )

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:446, in read()
    444 blocks = []
    445 while True:
--> 446     block = await self.readany()
    447     if not block:
    448         break

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:468, in readany()
    464 # TODO: should be `if` instead of `while`
    465 # because waiter maybe triggered on chunk end,
    466 # without feeding any data
    467 while not self._buffer and not self._eof:
--> 468     await self._wait("readany")
    470 return self._read_nowait(-1)

File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:372, in _wait()
    370 try:
    371     with self._timer:
--> 372         await waiter
    373 finally:
    374     self._waiter = None

ClientPayloadError: Response payload is not completed: <ContentLengthError: 400, message='Not enough data to satisfy content length header.'>. ConnectionResetError(104, 'Connection reset by peer')

Step 2 (executes quickly)

%%time 
t_hist_ts_df = t_hist_ts.to_series().T
#t_hist_ts_df.head()

Step 3 (takes even longer than Step 1)

%%time
t_20c_ts_df = t_20c_ts.to_series().unstack().T
t_20c_ts_df.head()

Step 4 (similar to Step 3 in its execution time)

%%time
# This also takes a while
t_rcp_ts_df = t_rcp_ts.to_series().unstack().T
t_rcp_ts_df.head()

Get observations for Figure 2 (HadCRUT4)

The HadCRUT4 temperature dataset is described by Morice et al. (2012).

Observational time series data for comparison with ensemble average:

obsDataURL = "https://www.esrl.noaa.gov/psd/thredds/dodsC/Datasets/cru/hadcrut4/air.mon.anom.median.nc"
ds = xr.open_dataset(obsDataURL).load()
ds
def weighted_temporal_mean(ds):
    """
    weight by days in each month
    """
    time_bound_diff = ds.time_bnds.diff(dim="nbnds")[:, 0]
    wgts = time_bound_diff.groupby("time.year") / time_bound_diff.groupby(
        "time.year"
    ).sum(xr.ALL_DIMS)
    obs = ds["air"]
    cond = obs.isnull()
    ones = xr.where(cond, 0.0, 1.0)
    obs_sum = (obs * wgts).resample(time="AS").sum(dim="time")
    ones_out = (ones * wgts).resample(time="AS").sum(dim="time")
    obs_s = (obs_sum / ones_out).mean(("lat", "lon")).to_series()
    return obs_s

Limit observations to 20th century:

obs_s = weighted_temporal_mean(ds)
obs_s = obs_s['1920':]
obs_s.head()
all_ts_anom = pd.concat([t_20c_ts_df, t_rcp_ts_df]) - t_ref_mean.data
years = [val.year for val in all_ts_anom.index]
obs_years = [val.year for val in obs_s.index]

Combine ensemble member 1 data from historical and 20th century experiments:

hist_anom = t_hist_ts_df - t_ref_mean.data
member1 = pd.concat([hist_anom.iloc[:-2], all_ts_anom.iloc[:,0]], verify_integrity=True)
member1_years = [val.year for val in member1.index]

Plotting Figure 2

Global surface temperature anomaly (1961-90 base period) for individual ensemble members, and observations:

ax = plt.axes()

ax.tick_params(right=True, top=True, direction="out", length=6, width=2, grid_alpha=0.5)
ax.plot(years, all_ts_anom.iloc[:,1:], color="grey")
ax.plot(obs_years, obs_s['1920':], color="red")
ax.plot(member1_years, member1, color="black")


ax.text(
    0.35,
    0.4,
    "observations",
    verticalalignment="bottom",
    horizontalalignment="left",
    transform=ax.transAxes,
    color="red",
    fontsize=10,
)
ax.text(
    0.35,
    0.33,
    "members 2-40",
    verticalalignment="bottom",
    horizontalalignment="left",
    transform=ax.transAxes,
    color="grey",
    fontsize=10,
)
ax.text(
    0.05,
    0.2,
    "member 1",
    verticalalignment="bottom",
    horizontalalignment="left",
    transform=ax.transAxes,
    color="black",
    fontsize=10,
)

ax.set_xticks([1850, 1920, 1950, 2000, 2050, 2100])
plt.ylim(-1, 5)
plt.xlim(1850, 2100)
plt.ylabel("Global Surface\nTemperature Anomaly (K)")
plt.show()

Figure 4

Compute linear trend for winter seasons

def linear_trend(da, dim="time"):
    da_chunk = da.chunk({dim: -1})
    trend = xr.apply_ufunc(
        calc_slope,
        da_chunk,
        vectorize=True,
        input_core_dims=[[dim]],
        output_core_dims=[[]],
        output_dtypes=[np.float64],
        dask="parallelized",
    )
    return trend


def calc_slope(y):
    """ufunc to be used by linear_trend"""
    x = np.arange(len(y))

    # drop missing values (NaNs) from x and y
    finite_indexes = ~np.isnan(y)
    slope = np.nan if (np.sum(finite_indexes) < 2) else np.polyfit(x[finite_indexes], y[finite_indexes], 1)[0]
    return slope
%%time 
# Takes several minutes
t = xr.concat([t_20c, t_rcp], dim="time")
seasons = t.sel(time=slice("1979", "2012")).resample(time="QS-DEC").mean("time")
# Include only full seasons from 1979 and 2012
seasons = seasons.sel(time=slice("1979", "2012")).load()
winter_seasons = seasons.sel(
    time=seasons.time.where(seasons.time.dt.month == 12, drop=True)
)
winter_trends = linear_trend(
    winter_seasons.chunk({"lat": 20, "lon": 20, "time": -1})
).load() * len(winter_seasons.time)

# Compute ensemble mean from the first 30 members
winter_trends_mean = winter_trends.isel(member_id=range(30)).mean(dim='member_id')

Make sure that we have 34 seasons:

assert len(winter_seasons.time) == 34

Get observations for Figure 4 (NASA GISS GisTemp)

This is observational time series data for comparison with ensemble average. Here we are using the GISS Surface Temperature Analysis (GISTEMP v4) from NASA’s Goddard Institute of Space Studies Lenssen et al., 2019.

Define the URL to Project Pythia’s Jetstream2 Object Store and the path to the Zarr file.

URL = 'https://js2.jetstream-cloud.org:8001'
filePath = 's3://pythia/gistemp1200_GHCNv4_ERSSTv5.zarr'

Create a container for the S3 file system

fs = s3fs.S3FileSystem(anon=True, client_kwargs=dict(endpoint_url=URL))

Link to the Zarr file as it exists on the S3 object store

store = s3fs.S3Map(root=filePath, s3=fs, check=False )
ds = xr.open_zarr(store, consolidated=True, chunks="auto")
ds

Create an Xarray Dataset from the Zarr object

Remap longitude range from [-180, 180] to [0, 360] for plotting purposes:

ds = ds.assign_coords(lon=((ds.lon + 360) % 360)).sortby('lon')
ds

Include only full seasons from 1979 through 2012:

obs_seasons = ds.sel(time=slice("1979", "2012")).resample(time="QS-DEC").mean("time")
obs_seasons = obs_seasons.sel(time=slice("1979", "2012")).load()
obs_winter_seasons = obs_seasons.sel(
    time=obs_seasons.time.where(obs_seasons.time.dt.month == 12, drop=True)
)
obs_winter_seasons

And compute observed winter trends:

obs_winter_trends = linear_trend(
    obs_winter_seasons.chunk({"lat": 20, "lon": 20, "time": -1})
).load() * len(obs_winter_seasons.time)
obs_winter_trends

Plotting Figure 4

Global maps of historical (1979 - 2012) boreal winter (DJF) surface air trends:

contour_levels = [-6, -5, -4, -3, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 3, 4, 5, 6]
color_map = cmaps.ncl_default
def make_map_plot(nplot_rows, nplot_cols, plot_index, data, plot_label):
    """ Create a single map subplot. """
    ax = plt.subplot(nplot_rows, nplot_cols, plot_index, projection = ccrs.Robinson(central_longitude = 180))
    cplot = plt.contourf(lons, lats, data,
                         levels = contour_levels,
                         cmap = color_map,
                         extend = 'both',
                         transform = ccrs.PlateCarree())
    ax.coastlines(color = 'grey')
    ax.text(0.01, 0.01, plot_label, fontsize = 14, transform = ax.transAxes)
    return cplot, ax
%%time
# Generate plot (may take a while as many individual maps are generated)
numPlotRows = 8
numPlotCols = 4
figWidth = 20
figHeight = 30

fig, axs = plt.subplots(numPlotRows, numPlotCols, figsize=(figWidth,figHeight))

lats = winter_trends.lat
lons = winter_trends.lon

# Create ensemble member plots
for ensemble_index in range(30):
    plot_data = winter_trends.isel(member_id = ensemble_index)
    plot_index = ensemble_index + 1
    plot_label = str(plot_index)
    plotRow = ensemble_index // numPlotCols
    plotCol = ensemble_index % numPlotCols
    # Retain axes objects for figure colorbar
    cplot, axs[plotRow, plotCol] = make_map_plot(numPlotRows, numPlotCols, plot_index, plot_data, plot_label)

# Create plots for the ensemble mean, observations, and a figure color bar.
cplot, axs[7,2] = make_map_plot(numPlotRows, numPlotCols, 31, winter_trends_mean, 'EM')

lats = obs_winter_trends.lat
lons = obs_winter_trends.lon
cplot, axs[7,3] = make_map_plot(numPlotRows, numPlotCols, 32, obs_winter_trends.tempanomaly, 'OBS')

cbar = fig.colorbar(cplot, ax=axs, orientation='horizontal', shrink = 0.7, pad = 0.02)
cbar.ax.set_title('1979-2012 DJF surface air temperature trends (K/34 years)', fontsize = 16)
cbar.set_ticks(contour_levels)
cbar.set_ticklabels(contour_levels)

Close our client:

client.close()

Summary

In this notebook, we used CESM LENS data hosted on AWS to recreate two key figures in the paper that describes the project.

What’s next?

More example workflows using these datasets may be added in the future.

References
  1. Kay, J. E., Deser, C., Phillips, A., Mai, A., Hannay, C., Strand, G., Arblaster, J. M., Bates, S. C., Danabasoglu, G., Edwards, J., Holland, M., Kushner, P., Lamarque, J.-F., Lawrence, D., Lindsday, K., Middleton, A., Munoz, E., Neale, R., Oleson, K., … Vertenstein, M. (2015). The Community Earth System Model (CESM) Large Ensemble Project. Bull. Amer. Meteor. Soc. 10.1175/BAMS-D-13-00255.1
  2. Morice, C. P., Kennedy, J. J., Rayner, N. A., & Jones, P. D. (2012). Quantifying uncertainties in global and regional temperature change using an ensemble of observational estimates: The HadCRUT4 data set. J. Geophys. Res. Atmos., 117(D8). 10.1029/2011JD017187
  3. Lenssen, N., Schmidt, G., Hansen, J., Menne, M., Persin, A., Ruedy, R., & Zyss, D. (2019). Improvements in the GISTEMP uncertainty model. Journal of Geophysical Research: Atmospheres, 124(12), 6307–6326. 10.1029/2018JD029522