Overview¶
This notebook demonstrates how one might use the NCAR Community Earth System Model (CESM) Large Ensemble (LENS) data hosted on AWS S3. The notebook shows how to reproduce figures 2 and 4 from the Kay et al. (2015) paper describing the CESM LENS dataset Kay et al., 2015.
This resource is intended to be helpful for people not familiar with elements of the Pangeo framework including Jupyter Notebooks, Xarray, and Zarr data format, or with the original paper, so it includes additional explanation.
Imports¶
import sys
import warnings
warnings.filterwarnings("ignore")
import intake
import matplotlib.pyplot as plt
from dask.distributed import Client
import numpy as np
import pandas as pd
import xarray as xr
import cmaps # for NCL colormaps
import cartopy.crs as ccrs
import dask
import s3fsdask.config.set({"distributed.scheduler.worker-saturation": 1.0})<dask.config.set at 0x7f903404df10>Create and Connect to Dask Distributed Cluster¶
Here we’ll use a dask cluster to parallelize our analysis.
platform = sys.platform
if (platform == 'win32'):
import multiprocessing.popen_spawn_win32
else:
import multiprocessing.popen_spawn_posixclient = Client()
clientLoad and Prepare Data¶
catalog_url = 'https://ncar-cesm-lens.s3-us-west-2.amazonaws.com/catalogs/aws-cesm1-le.json'
col = intake.open_esm_datastore(catalog_url)
colShow the first few lines of the catalog:
col.df.head(10)Show expanded version of collection structure with details:
col.keys_info().head()Extract data needed to construct Figure 2¶
Search the catalog to find the desired data, in this case the reference height temperature of the atmosphere, at daily time resolution, for the Historical, 20th Century, and RCP8.5 (IPCC Representative Concentration Pathway 8.5) experiments.
col_subset = col.search(frequency=["daily", "monthly"], component="atm", variable="TREFHT",
experiment=["20C", "RCP85", "HIST"])
col_subsetcol_subset.dfLoad catalog entries for subset into a dictionary of Xarray Datasets:
dsets = col_subset.to_dataset_dict(zarr_kwargs={"consolidated": True}, storage_options={"anon": True})
print(f"\nDataset dictionary keys:\n {dsets.keys()}")
--> The keys in the returned dictionary of datasets are constructed as follows:
'component.experiment.frequency'
Dataset dictionary keys:
dict_keys(['atm.RCP85.monthly', 'atm.HIST.monthly', 'atm.HIST.daily', 'atm.RCP85.daily', 'atm.20C.monthly', 'atm.20C.daily'])
Define Xarray Datasets corresponding to the three experiments:
ds_HIST = dsets['atm.HIST.monthly']
ds_20C = dsets['atm.20C.daily']
ds_RCP85 = dsets['atm.RCP85.daily']Use the dask.distributed utility function to display size of each dataset:
from dask.utils import format_bytes
print(f"Historical: {format_bytes(ds_HIST.nbytes)}\n"
f"20th Century: {format_bytes(ds_20C.nbytes)}\n"
f"RCP8.5: {format_bytes(ds_RCP85.nbytes)}")Historical: 177.21 MiB
20th Century: 258.65 GiB
RCP8.5: 285.71 GiB
Now, extract the Reference Height Temperature data variable:
t_hist = ds_HIST["TREFHT"]
t_20c = ds_20C["TREFHT"]
t_rcp = ds_RCP85["TREFHT"]
t_20cThe global surface temperature anomaly was computed relative to the 1961-90 base period in the Kay et al. paper, so extract that time slice:
t_ref = t_20c.sel(time=slice("1961", "1990"))Figure 2¶
Read grid cell areas¶
Cell size varies with latitude, so this must be accounted for when computing the global mean.
cat = col.search(frequency="static", component="atm", experiment=["20C"])
_, grid = cat.to_dataset_dict(aggregate=False, storage_options={'anon':True}, zarr_kwargs={"consolidated": True}).popitem()
grid
--> The keys in the returned dictionary of datasets are constructed as follows:
'variable.long_name.component.experiment.frequency.vertical_levels.spatial_domain.units.start_time.end_time.path'
cell_area = grid.area.load()
total_area = cell_area.sum()
cell_areaDefine weighted means¶
Note: resample(time="AS") does an annual resampling based on start of calendar year. See documentation for Pandas resampling options.
t_ref_ts = (
(t_ref.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
/ total_area
).mean(dim=("time", "member_id"))
t_hist_ts = (
(t_hist.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_area
t_20c_ts = (
(t_20c.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_area
t_rcp_ts = (
(t_rcp.resample(time="AS").mean("time") * cell_area).sum(dim=("lat", "lon"))
) / total_areaRead data and compute means¶
Dask’s “lazy execution” philosophy means that until this point we have not actually read the bulk of the data. Steps 1, 3, and 4 take a while to complete, so we include the Notebook “cell magic” directive %%time to display elapsed and CPU times after computation.
Step 1 (takes a while)
%%time
# this cell takes a while, be patient
t_ref_mean = t_ref_ts.load()
t_ref_mean2026-06-07 04:20:05,503 - distributed.worker - ERROR - Compute Failed
Key: ('open_dataset-TREFHT-757677e046c6059c913a7f48c3634800', 0, 44, 0, 0)
State: executing
Task: <Task ('open_dataset-TREFHT-757677e046c6059c913a7f48c3634800', 0, 44, 0, 0) getter(...)>
Exception: 'ClientPayloadError("Response payload is not completed: <ContentLengthError: 400, message=\'Not enough data to satisfy content length header.\'>. ConnectionResetError(104, \'Connection reset by peer\')")'
Traceback: ' File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/dask/array/core.py", line 140, in getter\n c = np.asarray(c)\n ^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 686, in __array__\n return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)\n ^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 691, in get_duck_array\n return self.array.get_duck_array()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 924, in get_duck_array\n return self.array.get_duck_array()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 764, in get_duck_array\n array = self.array[self.key]\n ~~~~~~~~~~^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 316, in __getitem__\n return indexing.explicit_indexing_adapter(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py", line 1156, in explicit_indexing_adapter\n result = raw_indexing_method(raw_key.tuple)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py", line 279, in _getitem\n return self._array[key]\n ~~~~~~~~~~~^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 2832, in __getitem__\n return self.get_orthogonal_selection(pure_selection, fields=fields)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 3303, in get_orthogonal_selection\n return sync(\n ^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py", line 159, in sync\n raise return_result\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py", line 119, in _runner\n return await coro\n ^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 1584, in _get_selection\n return await _get_selection(\n ^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py", line 5616, in _get_selection\n await codec_pipeline.read(\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py", line 475, in read\n await concurrent_map(\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 117, in concurrent_map\n return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 115, in run\n return await func(*item)\n ^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py", line 272, in read_batch\n chunk_bytes_batch = await concurrent_map(\n ^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 117, in concurrent_map\n return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py", line 115, in run\n return await func(*item)\n ^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_common.py", line 174, in get\n return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_fsspec.py", line 289, in get\n value = prototype.buffer.from_bytes(await self.fs._cat_file(path))\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1322, in _cat_file\n return await self._cat_file_concurrent(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1363, in _cat_file_concurrent\n results = await asyncio.gather(\n ^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py", line 1354, in _read_chunk\n data = await resp["Body"].read()\n ^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiobotocore/response.py", line 59, in read\n chunk = await self.__wrapped__.content.read(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 446, in read\n block = await self.readany()\n ^^^^^^^^^^^^^^^^^^^^\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 468, in readany\n await self._wait("readany")\n File "/home/runner/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py", line 372, in _wait\n await waiter\n'
CPU times: user 14.2 s, sys: 2.87 s, total: 17.1 s
Wall time: 3min 5s
---------------------------------------------------------------------------
ContentLengthError Traceback (most recent call last)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/client_proto.py:144, in connection_lost()
143 try:
--> 144 uncompleted = self._parser.feed_eof()
145 except Exception as underlying_exc:
File aiohttp/_http_parser.pyx:546
--> 546 def feed_eof(self):
547 cdef bytes desc
ContentLengthError: 400, message:
Not enough data to satisfy content length header.
The above exception was the direct cause of the following exception:
ClientPayloadError Traceback (most recent call last)
Cell In[18], line 1
----> 1 get_ipython().run_cell_magic('time', '', '# this cell takes a while, be patient\nt_ref_mean = t_ref_ts.load()\nt_ref_mean\n')
File <timed exec>:2
1 'Could not get source, probably due dynamically evaluated source code.'
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/dataarray.py:1173, in DataArray.load(self, **kwargs)
1143 def load(self, **kwargs) -> Self:
1144 """Trigger loading data into memory and return this dataarray.
1145
1146 Data will be computed and/or loaded from disk or a remote source.
(...) 1171 Variable.load
1172 """
-> 1173 ds = self._to_temp_dataset().load(**kwargs)
1174 new = self._from_temp_dataset(ds)
1175 self._variable = new._variable
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/dataset.py:569, in Dataset.load(self, **kwargs)
565 if chunked_data:
566 chunkmanager = get_chunked_array_type(*chunked_data.values())
567
568 # evaluate all the chunked arrays simultaneously
--> 569 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
570 *chunked_data.values(), **kwargs
571 )
572
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
80 def compute(
81 self, *data: Any, **kwargs: Any
82 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
83 from dask.array import compute
---> 85 return compute(*data, **kwargs)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/dask/base.py:685, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
682 expr = expr.optimize()
683 keys = list(flatten(expr.__dask_keys__()))
--> 685 results = schedule(expr, keys, **kwargs)
687 return repack(results)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:686, in __array__()
682 def __array__(
683 self, dtype: DTypeLike | None = None, /, *, copy: bool | None = None
684 ) -> np.ndarray:
685 if Version(np.__version__) >= Version("2.0.0"):
--> 686 return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)
687 else:
688 return np.asarray(self.get_duck_array(), dtype=dtype)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:691, in get_duck_array()
690 def get_duck_array(self):
--> 691 return self.array.get_duck_array()
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:924, in get_duck_array()
923 def get_duck_array(self):
--> 924 return self.array.get_duck_array()
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:764, in get_duck_array()
761 from xarray.backends.common import BackendArray
763 if isinstance(self.array, BackendArray):
--> 764 array = self.array[self.key]
765 else:
766 array = apply_indexer(self.array, self.key)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py:316, in __getitem__()
314 elif isinstance(key, indexing.OuterIndexer):
315 method = self._oindex
--> 316 return indexing.explicit_indexing_adapter(
317 key, array.shape, indexing.IndexingSupport.VECTORIZED, method
318 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/core/indexing.py:1156, in explicit_indexing_adapter()
1134 """Support explicit indexing by delegating to a raw indexing method.
1135
1136 Outer and/or vectorized indexers are supported by indexing a second time
(...) 1153 Indexing result, in the form of a duck numpy-array.
1154 """
1155 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1156 result = raw_indexing_method(raw_key.tuple)
1157 if numpy_indices.tuple:
1158 # index the loaded duck array
1159 indexable = as_indexable(result)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/xarray/backends/zarr.py:279, in _getitem()
278 def _getitem(self, key):
--> 279 return self._array[key]
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:2832, in __getitem__()
2830 return self.vindex[cast("CoordinateSelection | MaskSelection", selection)]
2831 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
-> 2832 return self.get_orthogonal_selection(pure_selection, fields=fields)
2833 else:
2834 return self.get_basic_selection(cast("BasicSelection", pure_selection), fields=fields)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:3303, in get_orthogonal_selection()
3301 prototype = default_buffer_prototype()
3302 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 3303 return sync(
3304 self.async_array._get_selection(
3305 indexer=indexer, out=out, fields=fields, prototype=prototype
3306 )
3307 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync()
156 return_result = next(iter(finished)).result()
158 if isinstance(return_result, BaseException):
--> 159 raise return_result
160 else:
161 return return_result
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner()
114 """
115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
116 exception, the exception will be returned.
117 """
118 try:
--> 119 return await coro
120 except Exception as ex:
121 return ex
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:1584, in _get_selection()
1576 async def _get_selection(
1577 self,
1578 indexer: Indexer,
(...) 1582 fields: Fields | None = None,
1583 ) -> NDArrayLikeOrScalar:
-> 1584 return await _get_selection(
1585 self.store_path,
1586 self.metadata,
1587 self.codec_pipeline,
1588 self.config,
1589 indexer,
1590 prototype=prototype,
1591 out=out,
1592 fields=fields,
1593 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/array.py:5616, in _get_selection()
5613 _config = replace(_config, order=order)
5615 # reading chunks and decoding them
-> 5616 await codec_pipeline.read(
5617 [
5618 (
5619 store_path / metadata.encode_chunk_key(chunk_coords),
5620 metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype),
5621 chunk_selection,
5622 out_selection,
5623 is_complete_chunk,
5624 )
5625 for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer
5626 ],
5627 out_buffer,
5628 drop_axes=indexer.drop_axes,
5629 )
5630 if isinstance(indexer, BasicIndexer) and indexer.shape == ():
5631 return out_buffer.as_scalar()
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:475, in read()
469 async def read(
470 self,
471 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
472 out: NDBuffer,
473 drop_axes: tuple[int, ...] = (),
474 ) -> None:
--> 475 await concurrent_map(
476 [
477 (single_batch_info, out, drop_axes)
478 for single_batch_info in batched(batch_info, self.batch_size)
479 ],
480 self.read_batch,
481 config.get("async.concurrency"),
482 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:117, in concurrent_map()
114 async with sem:
115 return await func(*item)
--> 117 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:115, in run()
113 async def run(item: tuple[Any]) -> V:
114 async with sem:
--> 115 return await func(*item)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:272, in read_batch()
270 out[out_selection] = fill_value_or_default(chunk_spec)
271 else:
--> 272 chunk_bytes_batch = await concurrent_map(
273 [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
274 lambda byte_getter, prototype: byte_getter.get(prototype),
275 config.get("async.concurrency"),
276 )
277 chunk_array_batch = await self.decode_batch(
278 [
279 (chunk_bytes, chunk_spec)
(...) 283 ],
284 )
285 for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip(
286 chunk_array_batch, batch_info, strict=False
287 ):
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:117, in concurrent_map()
114 async with sem:
115 return await func(*item)
--> 117 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/core/common.py:115, in run()
113 async def run(item: tuple[Any]) -> V:
114 async with sem:
--> 115 return await func(*item)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_common.py:174, in get()
172 if prototype is None:
173 prototype = default_buffer_prototype()
--> 174 return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/zarr/storage/_fsspec.py:289, in get()
287 try:
288 if byte_range is None:
--> 289 value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
290 elif isinstance(byte_range, RangeByteRequest):
291 value = prototype.buffer.from_bytes(
292 await self.fs._cat_file(
293 path,
(...) 296 )
297 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1322, in _cat_file()
1319 resp["Body"].close()
1321 if content_length and content_length > chunksize:
-> 1322 return await self._cat_file_concurrent(
1323 bucket,
1324 key,
1325 content_length,
1326 chunksize,
1327 max_concurrency=max_concurrency,
1328 version_id=version_id or vers,
1329 )
1331 return await _error_wrapper(_call_and_read, retries=self.retries)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1363, in _cat_file_concurrent()
1361 buf = bytearray(content_length)
1362 for batch_start, batch_stop in zip(inds[:-1], inds[1:]):
-> 1363 results = await asyncio.gather(
1364 *[
1365 _read_chunk(start, end)
1366 for start, end in ranges[batch_start:batch_stop]
1367 ]
1368 )
1369 for offset, data in results:
1370 buf[offset : offset + len(data)] = data
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/s3fs/core.py:1354, in _read_chunk()
1346 kw["Range"] = f"bytes={start}-{end}"
1347 resp = await self._call_s3(
1348 "get_object",
1349 Bucket=bucket,
(...) 1352 **kw,
1353 )
-> 1354 data = await resp["Body"].read()
1355 resp["Body"].close()
1356 return start, data
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiobotocore/response.py:59, in read()
57 # botocore to aiohttp mapping
58 try:
---> 59 chunk = await self.__wrapped__.content.read(
60 amt if amt is not None else -1
61 )
62 except asyncio.TimeoutError as e:
63 raise AioReadTimeoutError(
64 endpoint_url=self.__wrapped__.url, error=e
65 )
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:446, in read()
444 blocks = []
445 while True:
--> 446 block = await self.readany()
447 if not block:
448 break
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:468, in readany()
464 # TODO: should be `if` instead of `while`
465 # because waiter maybe triggered on chunk end,
466 # without feeding any data
467 while not self._buffer and not self._eof:
--> 468 await self._wait("readany")
470 return self._read_nowait(-1)
File ~/micromamba/envs/cla-cookbook-dev/lib/python3.11/site-packages/aiohttp/streams.py:372, in _wait()
370 try:
371 with self._timer:
--> 372 await waiter
373 finally:
374 self._waiter = None
ClientPayloadError: Response payload is not completed: <ContentLengthError: 400, message='Not enough data to satisfy content length header.'>. ConnectionResetError(104, 'Connection reset by peer')Step 2 (executes quickly)
%%time
t_hist_ts_df = t_hist_ts.to_series().T
#t_hist_ts_df.head()Step 3 (takes even longer than Step 1)
%%time
t_20c_ts_df = t_20c_ts.to_series().unstack().T
t_20c_ts_df.head()Step 4 (similar to Step 3 in its execution time)
%%time
# This also takes a while
t_rcp_ts_df = t_rcp_ts.to_series().unstack().T
t_rcp_ts_df.head()Get observations for Figure 2 (HadCRUT4)¶
The HadCRUT4 temperature dataset is described by Morice et al. (2012).
Observational time series data for comparison with ensemble average:
obsDataURL = "https://www.esrl.noaa.gov/psd/thredds/dodsC/Datasets/cru/hadcrut4/air.mon.anom.median.nc"
ds = xr.open_dataset(obsDataURL).load()
dsdef weighted_temporal_mean(ds):
"""
weight by days in each month
"""
time_bound_diff = ds.time_bnds.diff(dim="nbnds")[:, 0]
wgts = time_bound_diff.groupby("time.year") / time_bound_diff.groupby(
"time.year"
).sum(xr.ALL_DIMS)
obs = ds["air"]
cond = obs.isnull()
ones = xr.where(cond, 0.0, 1.0)
obs_sum = (obs * wgts).resample(time="AS").sum(dim="time")
ones_out = (ones * wgts).resample(time="AS").sum(dim="time")
obs_s = (obs_sum / ones_out).mean(("lat", "lon")).to_series()
return obs_sLimit observations to 20th century:
obs_s = weighted_temporal_mean(ds)
obs_s = obs_s['1920':]
obs_s.head()all_ts_anom = pd.concat([t_20c_ts_df, t_rcp_ts_df]) - t_ref_mean.data
years = [val.year for val in all_ts_anom.index]
obs_years = [val.year for val in obs_s.index]Combine ensemble member 1 data from historical and 20th century experiments:
hist_anom = t_hist_ts_df - t_ref_mean.data
member1 = pd.concat([hist_anom.iloc[:-2], all_ts_anom.iloc[:,0]], verify_integrity=True)
member1_years = [val.year for val in member1.index]Plotting Figure 2¶
Global surface temperature anomaly (1961-90 base period) for individual ensemble members, and observations:
ax = plt.axes()
ax.tick_params(right=True, top=True, direction="out", length=6, width=2, grid_alpha=0.5)
ax.plot(years, all_ts_anom.iloc[:,1:], color="grey")
ax.plot(obs_years, obs_s['1920':], color="red")
ax.plot(member1_years, member1, color="black")
ax.text(
0.35,
0.4,
"observations",
verticalalignment="bottom",
horizontalalignment="left",
transform=ax.transAxes,
color="red",
fontsize=10,
)
ax.text(
0.35,
0.33,
"members 2-40",
verticalalignment="bottom",
horizontalalignment="left",
transform=ax.transAxes,
color="grey",
fontsize=10,
)
ax.text(
0.05,
0.2,
"member 1",
verticalalignment="bottom",
horizontalalignment="left",
transform=ax.transAxes,
color="black",
fontsize=10,
)
ax.set_xticks([1850, 1920, 1950, 2000, 2050, 2100])
plt.ylim(-1, 5)
plt.xlim(1850, 2100)
plt.ylabel("Global Surface\nTemperature Anomaly (K)")
plt.show()Figure 4¶
Compute linear trend for winter seasons¶
def linear_trend(da, dim="time"):
da_chunk = da.chunk({dim: -1})
trend = xr.apply_ufunc(
calc_slope,
da_chunk,
vectorize=True,
input_core_dims=[[dim]],
output_core_dims=[[]],
output_dtypes=[np.float64],
dask="parallelized",
)
return trend
def calc_slope(y):
"""ufunc to be used by linear_trend"""
x = np.arange(len(y))
# drop missing values (NaNs) from x and y
finite_indexes = ~np.isnan(y)
slope = np.nan if (np.sum(finite_indexes) < 2) else np.polyfit(x[finite_indexes], y[finite_indexes], 1)[0]
return slopeCompute ensemble trends¶
%%time
# Takes several minutes
t = xr.concat([t_20c, t_rcp], dim="time")
seasons = t.sel(time=slice("1979", "2012")).resample(time="QS-DEC").mean("time")
# Include only full seasons from 1979 and 2012
seasons = seasons.sel(time=slice("1979", "2012")).load()winter_seasons = seasons.sel(
time=seasons.time.where(seasons.time.dt.month == 12, drop=True)
)
winter_trends = linear_trend(
winter_seasons.chunk({"lat": 20, "lon": 20, "time": -1})
).load() * len(winter_seasons.time)
# Compute ensemble mean from the first 30 members
winter_trends_mean = winter_trends.isel(member_id=range(30)).mean(dim='member_id')Make sure that we have 34 seasons:
assert len(winter_seasons.time) == 34Get observations for Figure 4 (NASA GISS GisTemp)¶
This is observational time series data for comparison with ensemble average. Here we are using the GISS Surface Temperature Analysis (GISTEMP v4) from NASA’s Goddard Institute of Space Studies Lenssen et al., 2019.
Define the URL to Project Pythia’s Jetstream2 Object Store and the path to the Zarr file.
URL = 'https://js2.jetstream-cloud.org:8001'
filePath = 's3://pythia/gistemp1200_GHCNv4_ERSSTv5.zarr'Create a container for the S3 file system
fs = s3fs.S3FileSystem(anon=True, client_kwargs=dict(endpoint_url=URL))Link to the Zarr file as it exists on the S3 object store
store = s3fs.S3Map(root=filePath, s3=fs, check=False )ds = xr.open_zarr(store, consolidated=True, chunks="auto")
dsCreate an Xarray Dataset from the Zarr object
Remap longitude range from [-180, 180] to [0, 360] for plotting purposes:
ds = ds.assign_coords(lon=((ds.lon + 360) % 360)).sortby('lon')
dsCompute observed trends¶
Include only full seasons from 1979 through 2012:
obs_seasons = ds.sel(time=slice("1979", "2012")).resample(time="QS-DEC").mean("time")
obs_seasons = obs_seasons.sel(time=slice("1979", "2012")).load()
obs_winter_seasons = obs_seasons.sel(
time=obs_seasons.time.where(obs_seasons.time.dt.month == 12, drop=True)
)
obs_winter_seasonsAnd compute observed winter trends:
obs_winter_trends = linear_trend(
obs_winter_seasons.chunk({"lat": 20, "lon": 20, "time": -1})
).load() * len(obs_winter_seasons.time)
obs_winter_trendsPlotting Figure 4¶
Global maps of historical (1979 - 2012) boreal winter (DJF) surface air trends:
contour_levels = [-6, -5, -4, -3, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 3, 4, 5, 6]
color_map = cmaps.ncl_defaultdef make_map_plot(nplot_rows, nplot_cols, plot_index, data, plot_label):
""" Create a single map subplot. """
ax = plt.subplot(nplot_rows, nplot_cols, plot_index, projection = ccrs.Robinson(central_longitude = 180))
cplot = plt.contourf(lons, lats, data,
levels = contour_levels,
cmap = color_map,
extend = 'both',
transform = ccrs.PlateCarree())
ax.coastlines(color = 'grey')
ax.text(0.01, 0.01, plot_label, fontsize = 14, transform = ax.transAxes)
return cplot, ax%%time
# Generate plot (may take a while as many individual maps are generated)
numPlotRows = 8
numPlotCols = 4
figWidth = 20
figHeight = 30
fig, axs = plt.subplots(numPlotRows, numPlotCols, figsize=(figWidth,figHeight))
lats = winter_trends.lat
lons = winter_trends.lon
# Create ensemble member plots
for ensemble_index in range(30):
plot_data = winter_trends.isel(member_id = ensemble_index)
plot_index = ensemble_index + 1
plot_label = str(plot_index)
plotRow = ensemble_index // numPlotCols
plotCol = ensemble_index % numPlotCols
# Retain axes objects for figure colorbar
cplot, axs[plotRow, plotCol] = make_map_plot(numPlotRows, numPlotCols, plot_index, plot_data, plot_label)
# Create plots for the ensemble mean, observations, and a figure color bar.
cplot, axs[7,2] = make_map_plot(numPlotRows, numPlotCols, 31, winter_trends_mean, 'EM')
lats = obs_winter_trends.lat
lons = obs_winter_trends.lon
cplot, axs[7,3] = make_map_plot(numPlotRows, numPlotCols, 32, obs_winter_trends.tempanomaly, 'OBS')
cbar = fig.colorbar(cplot, ax=axs, orientation='horizontal', shrink = 0.7, pad = 0.02)
cbar.ax.set_title('1979-2012 DJF surface air temperature trends (K/34 years)', fontsize = 16)
cbar.set_ticks(contour_levels)
cbar.set_ticklabels(contour_levels)Close our client:
client.close()Summary¶
In this notebook, we used CESM LENS data hosted on AWS to recreate two key figures in the paper that describes the project.
What’s next?¶
More example workflows using these datasets may be added in the future.
- Kay, J. E., Deser, C., Phillips, A., Mai, A., Hannay, C., Strand, G., Arblaster, J. M., Bates, S. C., Danabasoglu, G., Edwards, J., Holland, M., Kushner, P., Lamarque, J.-F., Lawrence, D., Lindsday, K., Middleton, A., Munoz, E., Neale, R., Oleson, K., … Vertenstein, M. (2015). The Community Earth System Model (CESM) Large Ensemble Project. Bull. Amer. Meteor. Soc. 10.1175/BAMS-D-13-00255.1
- Morice, C. P., Kennedy, J. J., Rayner, N. A., & Jones, P. D. (2012). Quantifying uncertainties in global and regional temperature change using an ensemble of observational estimates: The HadCRUT4 data set. J. Geophys. Res. Atmos., 117(D8). 10.1029/2011JD017187
- Lenssen, N., Schmidt, G., Hansen, J., Menne, M., Persin, A., Ruedy, R., & Zyss, D. (2019). Improvements in the GISTEMP uncertainty model. Journal of Geophysical Research: Atmospheres, 124(12), 6307–6326. 10.1029/2018JD029522