aboutsummaryrefslogtreecommitdiff
path: root/Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb
diff options
context:
space:
mode:
authorleshe4ka46 <alex9102naid1@ya.ru>2025-10-18 12:25:53 +0300
committerleshe4ka46 <alex9102naid1@ya.ru>2025-10-18 12:25:53 +0300
commit910a222fa60ce6ea0831f2956470b8a0b9f62670 (patch)
tree1d6bbccafb667731ad127f93390761100fc11b53 /Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb
parent35b9040e4104b0e79bf243a2c9769c589f96e2c4 (diff)
nvidia2
Diffstat (limited to 'Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb')
-rw-r--r--Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb980
1 files changed, 980 insertions, 0 deletions
diff --git a/Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb b/Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb
new file mode 100644
index 0000000..6fa9b0c
--- /dev/null
+++ b/Fundamentals_of_Accelerated_Data_Science/1-09_dask-cudf.ipynb
@@ -0,0 +1,980 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "<img src=\"./images/DLI_Header.png\" width=400/>"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Fundamentals of Accelerated Data Science # "
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Transition Path: cuDF provides a way for users to scale their pandas workflows as data sizes grow, offering a middle ground between single-threaded pandas and distributed computing solutions like Dask or Apache Spark ."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 09 - Introduction to Dask cuDF ##\n",
+ "\n",
+ "**Table of Contents**\n",
+ "<br>\n",
+ "[Dask](https://dask.org/) cuDF can be used to distribute dataframe operations to multiple GPUs. In this notebook we will introduce some key Dask concepts, learn how to setup a Dask cluster for utilizing multiple GPUs, and see how to perform simple dataframe operations on distributed Dask dataframes. This notebook covers the below sections: \n",
+ "1. [An Introduction to Dask](#An-Introduction-to-Dask)\n",
+ "2. [Setting up a Dask Scheduler](#Setting-up-a-Dask-Scheduler)\n",
+ " * [Obtaining the Local IP Address](#Obtaining-the-Local-IP-Address)\n",
+ " * [Starting a `LocalCUDACluster`](#Starting-a-LocalCUDACluster)\n",
+ " * [Instantiating a Client Connection](#Instantiating-a-Client-Connection)\n",
+ " * [The Dask Dashboard](#The-Dask-Dashboard)\n",
+ "3. [Reading Data with Dask cuDF](#Reading-Data-with-Dask-cuDF)\n",
+ "4. [Computational Graph](#Computational-Graph)\n",
+ " * [Visualizing the Computational Graph](#Visualizing-the-Computational-Graph)\n",
+ " * [Extending the Computational Graph](#Extending-the-Computational-Graph)\n",
+ " * [Computing with the Computational Graph](#Computing-with-the-Computational-Graph)\n",
+ " * [Persisting Data in the Cluster](#Persisting-Data-in-the-Cluster)\n",
+ "6. [Initial Data Exploration with Dask cuDF](#Initial-Data-Exploration-with-Dask-cuDF)\n",
+ " * [Exercise #1 - Counties North of Sunderland with Dask](#Exercise-#1---Counties-North-of-Sunderland-with-Dask)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## An Introduction to Dask ##\n",
+ "[Dask](https://dask.org/) is a Python library for parallel computing. In Dask programming, we create computational graphs that define code we **would like** to execute, and then, give these computational graphs to a Dask scheduler which evaluates them lazily, and efficiently, in parallel. \n",
+ "\n",
+ "In addition to using multiple CPU cores or threads to execute computational graphs in parallel, Dask schedulers can also be configured to execute computational graphs on multiple CPUs, or, as we will do in this workshop, multiple GPUs. As a result, Dask programming facilitates operating on data sets that are larger than the memory of a single compute resource.\n",
+ "\n",
+ "Because Dask computational graphs can consist of arbitrary Python code, they provide [a level of control and flexibility superior to many other systems](https://docs.dask.org/en/latest/spark.html) that can operate on massive data sets. However, we will focus for this workshop primarily on the Dask DataFrame, one of several data structures whose operations and methods natively utilize Dask's parallel scheduling:\n",
+ "* Dask DataFrame, which closely resembles the Pandas DataFrame\n",
+ "* Dask Array, which closely resembles the NumPy ndarray\n",
+ "* Dask Bag, a set which allows duplicates and can hold heterogeneously-typed data\n",
+ "\n",
+ "In particular, we will use a Dask-cuDF dataframe, which combines the interface of Dask with the GPU power of cuDF for distributed dataframe operations on multiple GPUs. We will now turn our attention to utilizing all 4 NVIDIA V100 GPUs in this environment for operations on an 18GB UK population data set that would not fit into the memory of a single 16GB GPU."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Setting up a Dask Scheduler ##\n",
+ "We begin by starting a Dask scheduler which will take care to distribute our work across the 4 available GPUs. In order to do this we need to start a `LocalCUDACluster` instance, using our host machine's IP, and then instantiate a client that can communicate with the cluster."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Obtaining the Local IP Address ###"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "b'172.18.0.3 \\n' 172.18.0.3\n"
+ ]
+ }
+ ],
+ "source": [
+ "import subprocess # we will use this to obtain our local IP using the following command\n",
+ "cmd = \"hostname --all-ip-addresses\"\n",
+ "\n",
+ "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n",
+ "output, error = process.communicate()\n",
+ "IPADDR = str(output.decode()).split()[0]\n",
+ "\n",
+ "\n",
+ "print(output, IPADDR)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Starting a `LocalCUDACluster` ###\n",
+ "`dask_cuda` provides utilities for Dask and CUDA (the \"cu\" in cuDF) interactions."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2025-10-11 18:21:59,418 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 33ec4d90c48a979c353aeb71ce7b23a4 initialized by task ('shuffle-transfer-33ec4d90c48a979c353aeb71ce7b23a4', 9) executed on worker tcp://172.18.0.3:38431\n",
+ "2025-10-11 18:22:00,314 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 33ec4d90c48a979c353aeb71ce7b23a4 deactivated due to stimulus 'task-finished-1760206920.3121095'\n",
+ "2025-10-11 18:22:12,178 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 7c9f20b72e188f02f8e832adc1df3ce1 initialized by task ('shuffle-transfer-7c9f20b72e188f02f8e832adc1df3ce1', 9) executed on worker tcp://172.18.0.3:38431\n",
+ "2025-10-11 18:22:12,808 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 7c9f20b72e188f02f8e832adc1df3ce1 deactivated due to stimulus 'task-finished-1760206932.8072617'\n",
+ "2025-10-11 18:22:16,144 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 33ec4d90c48a979c353aeb71ce7b23a4 initialized by task ('shuffle-transfer-33ec4d90c48a979c353aeb71ce7b23a4', 9) executed on worker tcp://172.18.0.3:38431\n",
+ "2025-10-11 18:22:16,873 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 33ec4d90c48a979c353aeb71ce7b23a4 deactivated due to stimulus 'task-finished-1760206936.870322'\n"
+ ]
+ }
+ ],
+ "source": [
+ "from dask_cuda import LocalCUDACluster\n",
+ "cluster = LocalCUDACluster(ip=IPADDR)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Instantiating a Client Connection ###\n",
+ "The `dask.distributed` library gives us distributed functionality, including the ability to connect to the CUDA Cluster we just created. The `progress` import will give us a handy progress bar we can utilize below."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from dask.distributed import Client, progress\n",
+ "\n",
+ "client = Client(cluster)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### The Dask Dashboard"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Dask ships with a very helpful dashboard that in our case runs on port `8787`. Open a new browser tab now and copy this lab's URL into it, replacing `/lab/lab` with `:8787` (so it ends with `.com:8787`). This should open the Dask dashboard, currently idle."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Reading Data with Dask cuDF ##\n",
+ "With `dask_cudf` we can create a dataframe from several file formats (including from multiple files and directly from cloud storage like S3), from cuDF dataframes, from Pandas dataframes, and even from vanilla CPU Dask dataframes. Here we will create a Dask cuDF dataframe from the local csv file `pop5x_1-07.csv`, which has similar features to the `pop.csv` files you have already been using, except scaled up to 5 times larger (18GB), representing a population of almost 300 million, nearly the size of the entire United States."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "18G data/uk_pop5x.csv\n"
+ ]
+ }
+ ],
+ "source": [
+ "# get the file size of `pop5x_1-07.csv` in GB\n",
+ "!ls -sh data/uk_pop5x.csv"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We import dask_cudf (and other RAPIDS components when necessary) after setting up the cluster to ensure that they establish correctly inside the CUDA context it creates."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import dask_cudf"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ddf = dask_cudf.read_csv('./data/uk_pop5x.csv', dtype=['float32', 'str', 'str', 'float32', 'float32', 'str'])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "age float32\n",
+ "sex object\n",
+ "county object\n",
+ "lat float32\n",
+ "long float32\n",
+ "name object\n",
+ "dtype: object"
+ ]
+ },
+ "execution_count": 12,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.dtypes"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Computational Graph ##\n",
+ "As mentioned above, when programming with Dask, we create computational graphs that we **would eventually like** to be executed. We can already observe this behavior in action: in calling `dask_cudf.read_csv` we have indicated that **would eventually like** to read the entire contents of `pop5x_1-07.csv`. However, Dask will not ask the scheduler execute this work until we explicitly indicate that we would like it do so.\n",
+ "\n",
+ "Observe the memory usage for each of the 4 GPUs by executing the following cell, and notice that the GPU memory usage is not nearly large enough to indicate that the entire 18GB file has been read into memory:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Sat Oct 11 18:18:47 2025 \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| NVIDIA-SMI 525.85.12 Driver Version: 525.85.12 CUDA Version: 12.0 |\n",
+ "|-------------------------------+----------------------+----------------------+\n",
+ "| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |\n",
+ "| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |\n",
+ "| | | MIG M. |\n",
+ "|===============================+======================+======================|\n",
+ "| 0 Tesla T4 On | 00000000:00:1B.0 Off | 0 |\n",
+ "| N/A 31C P0 25W / 70W | 690MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 1 Tesla T4 On | 00000000:00:1C.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 168MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 2 Tesla T4 On | 00000000:00:1D.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 168MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 3 Tesla T4 On | 00000000:00:1E.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 168MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ " \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| Processes: |\n",
+ "| GPU GI CI PID Type Process name GPU Memory |\n",
+ "| ID ID Usage |\n",
+ "|=============================================================================|\n",
+ "+-----------------------------------------------------------------------------+\n"
+ ]
+ }
+ ],
+ "source": [
+ "!nvidia-smi"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Visualizing the Computational Graph ###\n",
+ "Computational graphs that have not yet been executed provide the `.visualize` method that, when used in a Jupyter environment such as this one, will display the computational graph, including how Dask intends to go about distributing the work. Thus, we can visualize how the `read_csv` operation will be distributed by Dask by executing the following cell:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
+ "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
+ " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
+ "<!-- Generated by graphviz version 2.43.0 (0)\n",
+ " -->\n",
+ "<!-- Title: %3 Pages: 1 -->\n",
+ "<svg width=\"115pt\" height=\"44pt\"\n",
+ " viewBox=\"0.00 0.00 115.00 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
+ "<title>%3</title>\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-40 111,-40 111,4 -4,4\"/>\n",
+ "<!-- &#45;8860210684066473962 -->\n",
+ "<g id=\"node1\" class=\"node\">\n",
+ "<title>&#45;8860210684066473962</title>\n",
+ "<polygon fill=\"none\" stroke=\"black\" points=\"107,-36 0,-36 0,0 107,0 107,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"53.5\" y=\"-13\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">ReadCSV</text>\n",
+ "</g>\n",
+ "</g>\n",
+ "</svg>\n"
+ ],
+ "text/plain": [
+ "<graphviz.graphs.Digraph at 0x7fce7e72d570>"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.visualize(format='svg') # This visualization is very large, and using `format='svg'` will make it easier to view."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "As you can see, when we indicate for Dask to actually execute this operation, it will parallelize the work across the 4 GPUs in something like 69 parallel partitions. We can see the exact number of partitions with the `npartitions` property:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "69"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.npartitions"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Extending the Computational Graph ###\n",
+ "The concept of constructing computational graphs with arbitrary operations before executing them is a core part of Dask. Let's add some operations to the existing computational graph and visualize it again.\n",
+ "\n",
+ "After running the next cell, although it will take some scrolling to get a clear sense of it (the challenges of distributed data analytics!), you can see that the graph already constructed for `read_csv` now continues upward. It selects the `age` column across all partitions (visualized as `getitem`) and eventually performs the `.mean()` reduction (visualized as `series-sum-chunk`, `series-sum-agg`, `count-chunk`, `sum-agg` and `true-div`)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
+ "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
+ " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
+ "<!-- Generated by graphviz version 2.43.0 (0)\n",
+ " -->\n",
+ "<!-- Title: %3 Pages: 1 -->\n",
+ "<svg width=\"296pt\" height=\"188pt\"\n",
+ " viewBox=\"0.00 0.00 296.00 188.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 184)\">\n",
+ "<title>%3</title>\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-184 292,-184 292,4 -4,4\"/>\n",
+ "<!-- 5719483230307289854 -->\n",
+ "<g id=\"node1\" class=\"node\">\n",
+ "<title>5719483230307289854</title>\n",
+ "<polygon fill=\"none\" stroke=\"black\" points=\"235.5,-180 52.5,-180 52.5,-144 235.5,-144 235.5,-180\"/>\n",
+ "<text text-anchor=\"middle\" x=\"144\" y=\"-157\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">Mean(Projection)</text>\n",
+ "</g>\n",
+ "<!-- 1479189491105329378 -->\n",
+ "<g id=\"node2\" class=\"node\">\n",
+ "<title>1479189491105329378</title>\n",
+ "<polygon fill=\"none\" stroke=\"black\" points=\"288,-108 0,-108 0,-72 288,-72 288,-108\"/>\n",
+ "<text text-anchor=\"middle\" x=\"144\" y=\"-85\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">Projection(FromGraph, age)</text>\n",
+ "</g>\n",
+ "<!-- 1479189491105329378&#45;&gt;5719483230307289854 -->\n",
+ "<g id=\"edge1\" class=\"edge\">\n",
+ "<title>1479189491105329378&#45;&gt;5719483230307289854</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M144,-108.3C144,-116.02 144,-125.29 144,-133.89\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"140.5,-133.9 144,-143.9 147.5,-133.9 140.5,-133.9\"/>\n",
+ "</g>\n",
+ "<!-- &#45;8624958272826520931 -->\n",
+ "<g id=\"node3\" class=\"node\">\n",
+ "<title>&#45;8624958272826520931</title>\n",
+ "<polygon fill=\"none\" stroke=\"black\" points=\"207.5,-36 80.5,-36 80.5,0 207.5,0 207.5,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"144\" y=\"-13\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">FromGraph</text>\n",
+ "</g>\n",
+ "<!-- &#45;8624958272826520931&#45;&gt;1479189491105329378 -->\n",
+ "<g id=\"edge2\" class=\"edge\">\n",
+ "<title>&#45;8624958272826520931&#45;&gt;1479189491105329378</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M144,-36.3C144,-44.02 144,-53.29 144,-61.89\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"140.5,-61.9 144,-71.9 147.5,-61.9 140.5,-61.9\"/>\n",
+ "</g>\n",
+ "</g>\n",
+ "</svg>\n"
+ ],
+ "text/plain": [
+ "<graphviz.graphs.Digraph at 0x7fce98b92020>"
+ ]
+ },
+ "execution_count": 24,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mean_age = ddf['age'].mean()\n",
+ "mean_age.visualize(format='svg')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Computing with the Computational Graph ###\n",
+ "There are several ways to indicate to Dask that we would like to perform the computations described in the computational graphs we have constructed. The first we will show is the `.compute` method, which will return the output of the computation as an object in one GPU's memory - no longer distributed across GPUs.\n",
+ "\n",
+ "**NOTE**: This value is actually a [*future*](https://docs.python.org/3/library/concurrent.futures.html) that it can be immediately used in code, even before it completes evaluating. While this can be tremendously useful in many scenarios, we will not need in this workshop to do anything fancy with the futures we generate except to wait for them to evaluate so we can visualize their values.\n",
+ "\n",
+ "Below we send the computational graph we have created to the Dask scheduler to be executed in parallel on our 4 GPUs. If you have the Dask Dashboard open on another tab from before, you can watch it while the operation completes. Because our graph involves reading the entire 18GB data set (as we declared when adding `read_csv` to the call graph), you can expect the operation to take a little time. If you closely watch the dashboard, you will see that Dask begins follow-on calculations for `mean` even while data is still being read into memory."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "40.1241924549316"
+ ]
+ },
+ "execution_count": 25,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mean_age.compute()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Persisting Data in the Cluster ###\n",
+ "As you can see, the previous operation, which read the entire 18GB csv into the GPUs' memory, did not retain the data in memory after completing the computational graph:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Sat Oct 11 18:19:55 2025 \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| NVIDIA-SMI 525.85.12 Driver Version: 525.85.12 CUDA Version: 12.0 |\n",
+ "|-------------------------------+----------------------+----------------------+\n",
+ "| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |\n",
+ "| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |\n",
+ "| | | MIG M. |\n",
+ "|===============================+======================+======================|\n",
+ "| 0 Tesla T4 On | 00000000:00:1B.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 1212MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 1 Tesla T4 On | 00000000:00:1C.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 690MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 2 Tesla T4 On | 00000000:00:1D.0 Off | 0 |\n",
+ "| N/A 32C P0 25W / 70W | 690MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 3 Tesla T4 On | 00000000:00:1E.0 Off | 0 |\n",
+ "| N/A 33C P0 25W / 70W | 690MiB / 15360MiB | 0% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ " \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| Processes: |\n",
+ "| GPU GI CI PID Type Process name GPU Memory |\n",
+ "| ID ID Usage |\n",
+ "|=============================================================================|\n",
+ "+-----------------------------------------------------------------------------+\n"
+ ]
+ }
+ ],
+ "source": [
+ "!nvidia-smi"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "A typical Dask workflow, which we will utilize, is to persist data we would like to work with to the cluster and then perform fast operations on that persisted data. We do this with the `.persist` method. From the [Dask documentation](https://distributed.dask.org/en/latest/manage-computation.html#client-persist):\n",
+ "\n",
+ ">The `.persist` method submits the task graph behind the Dask collection to the scheduler, obtaining Futures for all of the top-most tasks (for example one Future for each Pandas [*or cuDF*] DataFrame in a Dask[*-cudf*] DataFrame). It then returns a copy of the collection pointing to these futures instead of the previous graph. This new collection is semantically equivalent but now points to actively running data rather than a lazy graph.\n",
+ "\n",
+ "Below we persist `ddf` to the cluster so that it will reside in GPU memory for us to perform fast operations on. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ddf = ddf.persist()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "As you can see by executing `nvidia-smi` (after letting the `persist` finish), each GPU now has parts of the distributed dataframe in its memory:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Sat Oct 11 18:20:05 2025 \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| NVIDIA-SMI 525.85.12 Driver Version: 525.85.12 CUDA Version: 12.0 |\n",
+ "|-------------------------------+----------------------+----------------------+\n",
+ "| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |\n",
+ "| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |\n",
+ "| | | MIG M. |\n",
+ "|===============================+======================+======================|\n",
+ "| 0 Tesla T4 On | 00000000:00:1B.0 Off | 0 |\n",
+ "| N/A 33C P0 38W / 70W | 4240MiB / 15360MiB | 88% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 1 Tesla T4 On | 00000000:00:1C.0 Off | 0 |\n",
+ "| N/A 34C P0 65W / 70W | 3792MiB / 15360MiB | 88% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 2 Tesla T4 On | 00000000:00:1D.0 Off | 0 |\n",
+ "| N/A 34C P0 40W / 70W | 3868MiB / 15360MiB | 90% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ "| 3 Tesla T4 On | 00000000:00:1E.0 Off | 0 |\n",
+ "| N/A 34C P0 39W / 70W | 3960MiB / 15360MiB | 91% Default |\n",
+ "| | | N/A |\n",
+ "+-------------------------------+----------------------+----------------------+\n",
+ " \n",
+ "+-----------------------------------------------------------------------------+\n",
+ "| Processes: |\n",
+ "| GPU GI CI PID Type Process name GPU Memory |\n",
+ "| ID ID Usage |\n",
+ "|=============================================================================|\n",
+ "+-----------------------------------------------------------------------------+\n"
+ ]
+ }
+ ],
+ "source": [
+ "!nvidia-smi"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Running `ddf.visualize` now shows that we no longer have operations in our task graph, only partitions of data, ready for us to perform operations:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
+ "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
+ " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
+ "<!-- Generated by graphviz version 2.43.0 (0)\n",
+ " -->\n",
+ "<!-- Title: %3 Pages: 1 -->\n",
+ "<svg width=\"135pt\" height=\"44pt\"\n",
+ " viewBox=\"0.00 0.00 135.00 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
+ "<title>%3</title>\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-40 131,-40 131,4 -4,4\"/>\n",
+ "<!-- &#45;8624958272826520931 -->\n",
+ "<g id=\"node1\" class=\"node\">\n",
+ "<title>&#45;8624958272826520931</title>\n",
+ "<polygon fill=\"none\" stroke=\"black\" points=\"127,-36 0,-36 0,0 127,0 127,-36\"/>\n",
+ "<text text-anchor=\"middle\" x=\"63.5\" y=\"-13\" font-family=\"Helvetica,sans-Serif\" font-size=\"20.00\">FromGraph</text>\n",
+ "</g>\n",
+ "</g>\n",
+ "</svg>\n"
+ ],
+ "text/plain": [
+ "<graphviz.graphs.Digraph at 0x7fce98c18250>"
+ ]
+ },
+ "execution_count": 21,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.visualize(format='svg')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Computing operations on this data will now be much faster:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "40.1241924549316"
+ ]
+ },
+ "execution_count": 22,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf['age'].mean().compute()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Initial Data Exploration with Dask cuDF ##\n",
+ "The beauty of Dask is that working with your data, even though it is distributed and massive, is a lot like working with smaller in-memory data sets."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>age</th>\n",
+ " <th>sex</th>\n",
+ " <th>county</th>\n",
+ " <th>lat</th>\n",
+ " <th>long</th>\n",
+ " <th>name</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>0.0</td>\n",
+ " <td>m</td>\n",
+ " <td>Darlington</td>\n",
+ " <td>54.549641</td>\n",
+ " <td>-1.493884</td>\n",
+ " <td>HARRISON</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>0.0</td>\n",
+ " <td>m</td>\n",
+ " <td>Darlington</td>\n",
+ " <td>54.523945</td>\n",
+ " <td>-1.401142</td>\n",
+ " <td>LAKSH</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>0.0</td>\n",
+ " <td>m</td>\n",
+ " <td>Darlington</td>\n",
+ " <td>54.561127</td>\n",
+ " <td>-1.690068</td>\n",
+ " <td>MUHAMMAD</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>3</th>\n",
+ " <td>0.0</td>\n",
+ " <td>m</td>\n",
+ " <td>Darlington</td>\n",
+ " <td>54.542988</td>\n",
+ " <td>-1.543216</td>\n",
+ " <td>GRAYSON</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>4</th>\n",
+ " <td>0.0</td>\n",
+ " <td>m</td>\n",
+ " <td>Darlington</td>\n",
+ " <td>54.532101</td>\n",
+ " <td>-1.569116</td>\n",
+ " <td>FINLAY</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " age sex county lat long name\n",
+ "0 0.0 m Darlington 54.549641 -1.493884 HARRISON\n",
+ "1 0.0 m Darlington 54.523945 -1.401142 LAKSH\n",
+ "2 0.0 m Darlington 54.561127 -1.690068 MUHAMMAD\n",
+ "3 0.0 m Darlington 54.542988 -1.543216 GRAYSON\n",
+ "4 0.0 m Darlington 54.532101 -1.569116 FINLAY"
+ ]
+ },
+ "execution_count": 26,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.head() # As a convenience, no need to `.compute` the `head()` method"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "age 292399470\n",
+ "sex 292399470\n",
+ "county 292399470\n",
+ "lat 292399470\n",
+ "long 292399470\n",
+ "name 292399470\n",
+ "dtype: int64"
+ ]
+ },
+ "execution_count": 27,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.count().compute()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "age float32\n",
+ "sex object\n",
+ "county object\n",
+ "lat float32\n",
+ "long float32\n",
+ "name object\n",
+ "dtype: object"
+ ]
+ },
+ "execution_count": 28,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "ddf.dtypes"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Exercise #1 - Counties North of Sunderland with Dask ###\n",
+ "Here we ask you to revisit an earlier exercise, but on the distributed data set. Hopefully, it's clear how similar the code is for single-GPU dataframes and distributed dataframes with Dask.\n",
+ "\n",
+ "Identify the latitude of the northernmost resident of Sunderland county (the person with the maximum `lat` value), and then determine which counties have any residents north of this resident. Use the `unique` method of a cudf `Series` to de-duplicate the result.\n",
+ "\n",
+ "**Instructions**: <br>\n",
+ "* Modify the `<FIXME>` only and execute the below cell to identify counties north of Sunderland. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 32,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "0 South Tyneside\n",
+ "0 Cumbria\n",
+ "0 County Durham\n",
+ "0 Gateshead\n",
+ "0 Northumberland\n",
+ "Name: county, dtype: object"
+ ]
+ },
+ "execution_count": 32,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "sunderland_residents = ddf.loc[ddf['county'].str.lower() == 'Sunderland'.lower()]\n",
+ "northmost_sunderland_lat = sunderland_residents['lat'].max()\n",
+ "counties_with_pop_north_of = ddf.loc[ddf['lat'] > northmost_sunderland_lat]['county'].unique()\n",
+ "results=counties_with_pop_north_of.compute()\n",
+ "results.head()"
+ ]
+ },
+ {
+ "cell_type": "raw",
+ "metadata": {},
+ "source": [
+ "\n",
+ "sunderland_residents = ddf.loc[ddf['county'] == 'Sunderland']\n",
+ "northmost_sunderland_lat = sunderland_residents['lat'].max()\n",
+ "counties_with_pop_north_of = ddf.loc[ddf['lat'] > northmost_sunderland_lat]['county'].unique()\n",
+ "results=counties_with_pop_north_of.compute()\n",
+ "results.head()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Click ... for solution. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import IPython\n",
+ "app = IPython.Application.instance()\n",
+ "app.kernel.do_shutdown(True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**Well Done!** Let's move to the [next notebook](1-09_cudf-polars.ipynb). "
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "<img src=\"./images/DLI_Header.png\" width=400/>"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.10.15"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}