{ "cells": [ { "cell_type": "markdown", "id": "990cfa4c-2117-4435-9806-ff9048890398", "metadata": { "tags": [] }, "source": [ "\"NCAR\n", "\n", "# Dask Arrays\n", "\n", "**ESDS Dask Tutorial | 06 February, 2023** \n", "\n", "Negin Sobhani, Brian Vanderwende, Deepak Cherian, Ben Kirk \n", "Computational & Information Systems Lab (CISL) \n", "[negins@ucar.edu](mailto:negins@ucar.edu), [vanderwb@ucar.edu](mailto:vanderwb@ucar.edu)\n", "\n", "\n", "---------\n", "\n", "### In this tutorial, you learn:\n", "\n", "* What is a Dask Array?\n", "* Basic concepts and features of Dask Arrays\n", "* Working with Dask arrays\n", "\n", "**Related Dask Array Documentation**\n", "\n", "* [Dask Array documentation](https://docs.dask.org/en/latest/array.html)\n", "* [Dask Array API](https://docs.dask.org/en/latest/array-api.html)\n", "* [Dask Array examples](https://examples.dask.org/array.html)\n", "\n", "\n", "## Dask Arrays\n", "Dask Arrays are basically parallelized version of NumPy arrays for processing *larger-than-memory data sets*. \n", "\n", "\n", "\n", "*Image credit: Anaconda, Inc. and contributors*\n", "\n", "Dask Array can be used as a drop-in replacement for NumPy arrays, with a similar API and support for a subset of NumPy functions. \n", "\n", "Dask effectively reduces the memory footprint of large array computations by dividing the arrays into smaller pieces (called **chunks**) that can fit into memory and stream the data from disk." ] }, { "cell_type": "markdown", "id": "106238d9-c519-47bc-b5c6-5c703ad47fe6", "metadata": {}, "source": [ "**Dask Arrays are lazy:** Unlike Numpy, operations on Dask arrays are not computed until you explicitly request them. \n", "\n", "
\n", "\n", "Lazy Evaluation: objects are evaluated just in time when the results are needed! \n", "\n", "Lazy evaluation help us avoid having large pieces of memory resident on the workers and optimize the resource requirements.\n", "\n", "
\n", "\n", "Dask Arrays don't directly hold any data. Instead, they provide **a symbolic representation of the necessary computations** to generate the data. We will explain this more below. \n", "\n", "\n", "\n", "Let's start exploring Dask Arrays:" ] }, { "cell_type": "markdown", "id": "72b6cdf8-db59-4a86-8f8d-c30ab67a46b2", "metadata": {}, "source": [ "## Setup: Start a Dask Client\n", "We will talk in-depth about Dask Cluster and Dask Clients later in this tutorial. Here we just created a local cluster and attached a client to it. " ] }, { "cell_type": "code", "execution_count": 1, "id": "7ebce44c-e6f6-4cd5-a23d-aa6963d1d8d0", "metadata": { "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-03-24 14:09:07,939 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-3nchxpcu', purging\n" ] }, { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-ba843d54-ca7f-11ed-b9e7-3e22fb53a158

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", " \n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

0db7eba9

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 4\n", "
\n", " Total threads: 16\n", " \n", " Total memory: 16.00 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-8820eee5-e363-4141-8d6d-342b30c8b58e

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:49466\n", " \n", " Workers: 4\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 16\n", "
\n", " Started: Just now\n", " \n", " Total memory: 16.00 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:49477\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:49478/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:49469\n", "
\n", " Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-_qfrcpnj\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:49481\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:49485/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:49470\n", "
\n", " Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-e31ozcws\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:49480\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:49483/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:49471\n", "
\n", " Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-kr2medyq\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:49482\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:49484/status\n", " \n", " Memory: 4.00 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:49472\n", "
\n", " Local directory: /var/folders/99/q6kpb9290c3f6grv0f9mqxzm0000gp/T/dask-worker-space/worker-4wsqdlgz\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "\n", "client = Client()\n", "client" ] }, { "cell_type": "markdown", "id": "ed8c0a5c-a3fa-4f7e-8c84-7bc96fd0a817", "metadata": {}, "source": [ "## Blocked Algorithms\n", "\n", "Dask Arrays use blocked algorithms to split large computations into smaller computations which operate on subsets of the data (called **chunks**).\n", "\n", "Let's see what this means in an example:" ] }, { "cell_type": "code", "execution_count": null, "id": "a3063d2a-b923-4042-b37c-3edb1cf29c2f", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import dask.array as da\n", "\n", "# A 4x4 numpy array that goes from 1 to 16 \n", "\n", "narr = np.array([\n", " [ 1, 2, 3, 4],\n", " [ 5, 6, 7, 8],\n", " [ 9, 10, 11, 12],\n", " [13, 14, 15, 16]\n", " ])\n", "\n", "# -- convert numpy array to dask array with 4 chunks\n", "darr = da.from_array( narr,chunks=(2, 2))" ] }, { "cell_type": "markdown", "id": "0f502592-b50f-458d-bf23-3550f06b93ad", "metadata": {}, "source": [ "Now we can calculate the sum of this array using `darr.sum()` similar to numpy. **But how is it different from numpy?**\n", "\n", "\n", "When you take the sum of the Dask array, Dask first takes the sum of each chunk and only after each of those is completed, takes the sum of the results from each chunk.\n", "\n", "\n", "\n", "\n", "*Image adapted from saturncloud.io*\n" ] }, { "cell_type": "markdown", "id": "af956be7-75fe-4d95-8212-76b8fabf1a1a", "metadata": {}, "source": [ "## Task Graph\n", "\n", "The Dask Task Graph serves as a **blueprint** for executing the computations. \n", "\n", "The Task Graph defines the (1) relationships between tasks, and (2) the order in which they should be executed.\n", "\n", "In a task graph each node in the graph represents a task and lines represent the dependencies/relationships between tasks." ] }, { "cell_type": "markdown", "id": "0fd27324-4077-46a4-97d9-6a01a61ab61d", "metadata": {}, "source": [ "We can visualize the low-level task graph using `.visualize()` method." ] }, { "cell_type": "code", "execution_count": null, "id": "32303572-b72a-49e2-9a59-165d11e2da1c", "metadata": {}, "outputs": [], "source": [ "darr.sum().visualize(rankdir=\"LR\")" ] }, { "cell_type": "markdown", "id": "9cf9a3cb-9187-405d-a176-9530ab64cdd7", "metadata": {}, "source": [ "It is generally good practice to look at the task graph before executing the computation. By looking at the task graph, you can learn about potential bottlenecks where parallelism is not possible. \n", "\n", "
\n", "\n", "TIP: For big computations, low-level task graphs gets very confusing. An alternative that provides a more concise graph is using `.dask.visualize()`.\n", "\n", "
\n" ] }, { "cell_type": "code", "execution_count": null, "id": "c75b77b1-65e1-48d9-ba92-0ffb9fd681cc", "metadata": {}, "outputs": [], "source": [ "#darr.sum().dask.visualize()" ] }, { "cell_type": "markdown", "id": "ea4672b8-fa4b-4709-98a0-7d3a36b592f1", "metadata": {}, "source": [ "\n", "----\n", "\n", "Now, let's start with another example. Here we create a 2D array of ones using NumPy." ] }, { "cell_type": "code", "execution_count": null, "id": "20392c9f-8902-43f6-bbf1-38d88532c361", "metadata": {}, "outputs": [], "source": [ "shape = (10000,12000)\n", "\n", "ones_np = np.ones(shape)\n", "ones_np" ] }, { "cell_type": "markdown", "id": "8940853a-9022-481e-934e-57b1742bf9cf", "metadata": {}, "source": [ "Now, let's create the same array using Dask:" ] }, { "cell_type": "code", "execution_count": null, "id": "5466813e-e71b-446a-bc14-f4d3b98a27df", "metadata": {}, "outputs": [], "source": [ "ones_da = da.ones(shape)\n", "ones_da" ] }, { "cell_type": "markdown", "id": "67c8f051-3889-41f6-99ca-87074f336bd0", "metadata": {}, "source": [ "We see a Dask Array representation of the data.\n", "This is a symbolic representation; no data has actually been generated yet. \n", "\n", "As we discussed previously, this mode of operation is called \"lazy\". \n", "\n", "This allows the user to build up a series of computations or tasks before being passed to the scheduler for execution." ] }, { "cell_type": "markdown", "id": "9c581c28-1e99-4571-a5fe-88e45727b1a7", "metadata": {}, "source": [ "## Chunks\n", "\n", "When checking the Dask array, the symbolic representation illustrates the concept of chunks. Dask arrays split the data into **sub-arrays** (or **chunks**) to optimize computation with large arrays. " ] }, { "cell_type": "markdown", "id": "3911489b-792c-4dd6-9e1e-4ca87de888a1", "metadata": {}, "source": [ "### Chunking an array\n", "\n", "**The way that arrays are chunked can significantly affect total performance.**\n", "\n", "For specifying the chunking of an array, we use the `chunks` argument when creating our `dask.array`.\n", "\n", "
\n", "\n", "⚠️ WARNING: Please note that `chunks` argument stands for **chunk shape** rather than “number of chunks”. \n", "For example, `chunks=1` means that you will have several chunks with one element. \n", "
\n", "\n", "There are several ways to define `chunks`. For example:\n", "\n", "1. A uniform dimension size like 1000, meaning chunks of size 1000 in each dimension. \n", "\n", "2. A uniform chunk shape like `(1000, 2000, 3000)`, meaning chunks of size 1000 in the first axis, 2000 in the second axis, and 3000 in the third. \n", "\n", "3. Fully explicit sizes of all blocks for all dimensions, like `((1000, 1000, 500), (400, 400), (5, 5, 5, 5, 5))`\n", "\n", "4. A dictionary specifying chunk size per dimension like `{0: 1000, 1: 2000, 2: 3000}`.\n", "\n", "\n", "Let's recreate the above Dask array, but this time we will specify chunk sizes (a.k.a. shapes) using the argument `chunks`. " ] }, { "cell_type": "code", "execution_count": null, "id": "9691bac8-8368-4311-a28e-5e2ab2a253f5", "metadata": {}, "outputs": [], "source": [ "# -- remember what the shape of our data array was\n", "shape" ] }, { "cell_type": "code", "execution_count": null, "id": "8c5bf84b-761e-445e-8c0e-f897ad5ab19a", "metadata": {}, "outputs": [], "source": [ "# create a dask array with 6 chunks\n", "chunk_shape = (5000,4000)\n", "ones_da = da.ones(shape,chunks=chunk_shape)\n", "ones_da" ] }, { "cell_type": "markdown", "id": "a322cb9f-e4c1-45ac-9727-0831b3d55054", "metadata": {}, "source": [ "You can see in the above dask array representation that we now have 6 chunks, each of shape (5000,4000) and size of ~ 160.0 MiB." ] }, { "cell_type": "markdown", "id": "2a12d097-9c1d-4a78-bbcf-4d7f52e47388", "metadata": {}, "source": [ "## Performance Comparison" ] }, { "cell_type": "markdown", "id": "bf6b8586-e191-4e55-b476-8099c612971b", "metadata": {}, "source": [ "To compare the performance between a NumPy array and an equivalent Dask array, let's calculate the mean. " ] }, { "cell_type": "code", "execution_count": null, "id": "9856fcec-e006-446b-b426-bff50a92cbd9", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# The %%time cell magic measures the execution time of the whole cell\n", "ones_np.mean()" ] }, { "cell_type": "code", "execution_count": null, "id": "86cc3673-c8a4-421b-8975-3cd2c30c7b1b", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Remember, we are not doing any computation here, just constructing our task graph\n", "mean_of_ones_da = ones_da.mean()" ] }, { "cell_type": "markdown", "id": "fcfee3c0-291b-4e53-a64d-2871b621ce2f", "metadata": {}, "source": [ "Remember :\n", "> *Dask doesn't do anything until you tell it... It is lazy!*\n", "\n", "So far we have just constructed our task graph but no computations yet!\n", "\n", "
\n", "\n", "NOTE: In order to generate the data, we need to call the `.compute()` method on the Dask Array to trigger our computation. \n", "\n", "
\n", "\n", "Let's run the `.compute()` method to see how this works:" ] }, { "cell_type": "code", "execution_count": null, "id": "1179dc52-9da9-419f-961b-ba806d9de498", "metadata": {}, "outputs": [], "source": [ "%%time\n", "mean_of_ones_da.compute()" ] }, { "cell_type": "markdown", "id": "86931ead-dc2e-475c-80ab-c2e020b9deea", "metadata": {}, "source": [ "`.compute()` method convertes Dask Arrays to Numpy Arrays. Let's check to see if this is true:" ] }, { "cell_type": "code", "execution_count": null, "id": "44c17fc5-d425-4a1c-9232-5fd6184fc3ec", "metadata": {}, "outputs": [], "source": [ "type (ones_da.compute())" ] }, { "cell_type": "markdown", "id": "2ba26970-bb89-4ea1-a40b-d885368651bb", "metadata": {}, "source": [ "\n", "\n", "
\n", "\n", "WARNING: Typically, when working with Dask arrays, we do not want to generate the data right away by calling `.compute()` on a large array. \n", "\n", "
\n", "\n", "\n", "We usually want to perform some computations that reduce the data size. For example, we might compute statistics like the mean or standard deviation.\n", "\n", "Let's look at an example of taking the mean and visualize the task graph. Remember, that no actual computation is taking place until we call `.compute()`." ] }, { "cell_type": "code", "execution_count": null, "id": "e9350bd2-0e92-4c92-8dae-cee7c2027c87", "metadata": {}, "outputs": [], "source": [ "mean_of_ones_da = ones_da.mean()\n", "mean_of_ones_da.visualize(rankdir=\"LR\")" ] }, { "cell_type": "markdown", "id": "f7e2a1d5-f606-4403-b3e3-d0d64b946035", "metadata": {}, "source": [ "-------\n", "\n", "\n", "What are the sizes of these arrays in memory? \n", "\n", "First, let's define a function that returns array size in MiB. " ] }, { "cell_type": "code", "execution_count": null, "id": "6c878ec8-0c21-45f7-83be-aaae9ba09897", "metadata": {}, "outputs": [], "source": [ "import sys\n", "\n", "# Define function to display variable size in MiB\n", "def var_size(in_var):\n", " result = sys.getsizeof(in_var) / 1024/1024\n", " return (result)" ] }, { "cell_type": "code", "execution_count": null, "id": "063d3891-65b8-48ad-a50f-10e4f88e32f1", "metadata": {}, "outputs": [], "source": [ "print(\"Shape of the numpy array : \", ones_np.shape) \n", "print(\"Shape of the dask array : \", ones_da.shape) \n", "\n", "# memory size of numpy array in MiB\n", "print(f\"Memory size of numpy array in MB : {var_size(ones_np):.2f} MiB\")\n", "# memory size of dask array in MiB\n", "print(f\"Memory size of dask array in MB : {var_size(ones_da):.2f} MiB\")" ] }, { "cell_type": "markdown", "id": "249d762a-91b6-403f-ad86-cddf0e4d71db", "metadata": {}, "source": [ "**Why memory size for the above Dask array is zero?**\n", "\n", "Remember, this variable is only a graph representation of the full array which will be split across workers.\n", "\n", "However, Dask does give us ways to see the full size of the data (often much larger than your client machine can handle)!" ] }, { "cell_type": "code", "execution_count": null, "id": "ffaf52e9-e7bd-4023-aa2f-9576ddafa9e7", "metadata": {}, "outputs": [], "source": [ "print(\"Size of Dask dataset: {:.2f} MiB\".format(ones_da.nbytes / 1024/1024))" ] }, { "cell_type": "markdown", "id": "878a7cdc-ef9c-4e83-9ae8-d10c2c140413", "metadata": {}, "source": [ "## Larger Data\n", "The previous example illustrated how Dask works, but using Dask is not really necessary (nor advisable) for an array of size 915.53 MiB. \n", "Let's try an example using bigger data and bigger calculations:" ] }, { "cell_type": "code", "execution_count": null, "id": "da9d506b-ffcf-4faf-8544-08ad76014819", "metadata": {}, "outputs": [], "source": [ "big_shape = (2000, 200, 2000)\n", "\n", "# -- this will make a big numpy array that might not fit on your machine\n", "#big_np = np.ones(big_shape)" ] }, { "cell_type": "markdown", "id": "80f66611-b525-40c8-9998-9dc09a45a8f1", "metadata": {}, "source": [ "Make a similar Dask Array with similar shape but specifying the `chunks` size:" ] }, { "cell_type": "code", "execution_count": null, "id": "570b4e69-2663-47fc-8e3f-0aaa50eb6b29", "metadata": {}, "outputs": [], "source": [ "big_da = da.ones(big_shape)\n", "big_da" ] }, { "cell_type": "code", "execution_count": null, "id": "264d7f95-3739-4e6f-b9bf-3bc722b30767", "metadata": {}, "outputs": [], "source": [ "# size of data\n", "#print(\"Memory size of NumPy dataset : {:.2f} GiB\".format(big_np.nbytes / 1024/1024/1024))\n", "print(\"Memory size of Dask dataset : {:.2f} GiB\".format(big_da.nbytes / 1024/1024/1024))" ] }, { "cell_type": "markdown", "id": "6269ea15-fdb2-4cec-962b-72c0e123c638", "metadata": {}, "source": [ "This may be close to the available memory/RAM that you have in your computer.\n", "\n", "Let's try bigger calculations on this array:" ] }, { "cell_type": "code", "execution_count": null, "id": "cde2f9a0-974c-43ed-a922-a587a34c79ec", "metadata": {}, "outputs": [], "source": [ "#%%time \n", "#z_np = (big_np + big_np.T)[::2,:].mean()" ] }, { "cell_type": "code", "execution_count": null, "id": "5ede8e0b-e62d-4f3f-b815-dcb123c092d4", "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "z_da = (big_da + big_da.T)[::2,:].mean(axis=2)\n", "\n", "result = z_da.compute()" ] }, { "cell_type": "code", "execution_count": null, "id": "acc1695d-ed28-4bcd-89aa-41c79de8cd3f", "metadata": {}, "outputs": [], "source": [ "#-- warning : do not try low level visualization with big arrays\n", "\n", "#z_da.visualize()" ] }, { "cell_type": "markdown", "id": "94f702a5-2af8-4b85-a466-b5bcd09635bd", "metadata": {}, "source": [ "All the usual NumPy functions work on dask arrays, though the computations will remain lazy until you either call `.compute()`, `.load()` or your want to plot the data." ] }, { "cell_type": "markdown", "id": "39adc87d-a41c-4e7e-bb31-0a51ba6aaed5", "metadata": {}, "source": [ "As we discussed above, the way that Dask arrays are chunked can significantly affect the performance. In the remainder of this notebook, let's do a similar calculation using a different `chunks` size. \n", "\n", "We will learn more about best practices regarding `chunk` size later during the tutorial. \n", "\n", "\n", "## Supplementary Material: Rechunking Arrays\n", "\n", "We can change the chunking of a Dask array, using the `rechunk` method. Please note that rechunking Dask arrays can be very expensive, so choosing an appropriate chunk size initially is ideal." ] }, { "cell_type": "code", "execution_count": null, "id": "68dca5a5-b67b-4205-9e83-48faefe44594", "metadata": {}, "outputs": [], "source": [ "new_chunk_shape = (50,50,50)\n", "rechunked_big_da = big_da.rechunk(new_chunk_shape)\n", "rechunked_big_da" ] }, { "cell_type": "code", "execution_count": null, "id": "ff5e0a0b-3d2f-46d4-a839-079dd3664572", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# perform big computation on chunked array\n", "\n", "z_da_rechunked = (rechunked_big_da + rechunked_big_da.T)[::2,:].mean(axis=2)\n", "\n", "result = z_da_rechunked.compute()" ] }, { "cell_type": "markdown", "id": "797cbbc7-e2c9-46b2-b58b-0f9be29f7bb9", "metadata": {}, "source": [ "We can see how the choice of smaller chunks (more total chunks) **significantly** reduce the total performance of our computation. \n", "\n", "
\n", "\n", "TIP: As a rule of thumb, a chunk should be big enough so that the computation on that chunk take significantly longer than the overhead from Dask scheduler. The Dask scheduler takes roughly 1ms per task for scheduling. \n", "\n", "
\n", "\n", "Let's try a bigger chunk size:" ] }, { "cell_type": "code", "execution_count": null, "id": "dfd36aa9-d676-4c14-b2ad-3f8414b107c5", "metadata": {}, "outputs": [], "source": [ "new_chunk_shape = (500, 100, 500)\n", "#big_chunk = \n", "\n", "rechunked_big_da = big_da.rechunk(new_chunk_shape)\n", "rechunked_big_da" ] }, { "cell_type": "code", "execution_count": null, "id": "6e90dceb-56d1-4ef5-a6f3-ab0b25606288", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# perform big computation on chunked array\n", "\n", "z_da_rechunked = (rechunked_big_da + rechunked_big_da.T)[::2,:].mean(axis=2)\n", "\n", "result = z_da_rechunked.compute()" ] }, { "cell_type": "markdown", "id": "fcc07937-e204-428e-af39-57bd7df1372b", "metadata": {}, "source": [ "\n", "
\n", "\n", "TIP: As a rule of thumb, a chunk should be small enough to fit comfortably in the memory. Chunk sizes between 10MB-1GB are common, depending on your machine,\n", "\n", "
\n" ] }, { "cell_type": "code", "execution_count": null, "id": "8dd1bc53-ead4-409c-a349-2e2abd78f59f", "metadata": {}, "outputs": [], "source": [ "client.close()" ] }, { "cell_type": "markdown", "id": "7b9f93a6-d792-47b6-a766-4cb6c0972d2a", "metadata": {}, "source": [ "## Summary:\n", "\n", "* Dask Array provides parallel computing capabilities by dividing arrays into smaller pieces called chunks.\n", "* Blocked algorithms split large computations into smaller computations which operate on subsets of the array.\n", "* Dask Array supports efficient computation on large arrays through a combination of lazy evaluation and task parallelism.\n", "* Dask Array can be used as a drop-in replacement for NumPy ndarray, with a similar API and support for a subset of NumPy functions.\n", "* The way that arrays are chunked can significantly affect total performance. Poor chunking can singifincantly worsen performance of Dask compared to NumPy. " ] }, { "cell_type": "markdown", "id": "984ea6ba-f37c-47ab-86ca-b5396ba6c071", "metadata": {}, "source": [ "## Resources and references\n", "\n", "* Reference\n", " * [Dask Docs](https://dask.org/)\n", " * [Dask Examples](https://examples.dask.org/)\n", " * [Dask Code](https://github.com/dask/dask/)\n", " * [Dask Blog](https://blog.dask.org/)\n", " \n", " \n", " \n", "* Ask for help\n", " * [`dask`](http://stackoverflow.com/questions/tagged/dask) tag on Stack Overflow, for usage questions\n", " * [github discussions: dask](https://github.com/dask/dask/discussions) for general, non-bug, discussion, and usage questions\n", " * [github issues: dask](https://github.com/dask/dask/issues/new) for bug reports and feature requests\n", " \n", "\n", "* Pieces of this notebook are adapted from the following sources\n", " * [Dask Performace Comparison](https://tutorial.dask.org/02_array.html#Performance-comparison)\n", " * [Dask Arrays by EEDS](https://earth-env-data-science.github.io/lectures/dask/dask_arrays.html)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }