<img src="./images/DLI_Header.png" width=400/>

# Fundamentals of Accelerated Data Science # 

## 07 - Extract, Transform, and Load ##

**Table of Contents**
<br>
In this notebook, we will go through the basics of extract, transform, and load. This notebook covers the below sections: 
1. [Extract, Transform, and Load (ETL)](#Extract,-Transform,-and-Load-(ETL))
    * [Extract](#Extract)
    * [Transform](#Transform)
    * [Load](#Load)
2. [Save to Parquet Format](#Save-to-Parquet-Format)
    * [Reading from Parquet](#Reading-from-Parquet)
3. [Accelerated ETL for Downstream Tasks](#Accelerated-ETL-for-Downstream-Tasks)

## Extract, Transform, and Load (ETL) ##
An important but perhaps not as highly glorified use case of RAPIDS is extract, transform, and load, or ETL for short. It is a data integration process used to combine data from multiple sources into a single, consistent data store. It's primary goals are: 
* Consolidates data from multiple sources into a single, consistent format
* Improves data quality through cleaning and validation
* Enables more efficient data analysis and reporting
* Supports data-driven decision making

### Extract ###
**Extract** is the first step where data is collected from various source systems. These sources could include: 
* Static files (csv, json)
* SQL RDBMS
* Webpages
* API

**Note**: cuDF doesn't have a way to get transactions from external SQL databases directly to GPU. The workaround is reading with pandas and create cuDF dataframe with `cudf.from_pandas()`. 

In [1]:
%load_ext cudf.pandas
# DO NOT CHANGE THIS CELL
import pandas as pd
import time

In [2]:
# DO NOT CHANGE THIS CELL
dtype_dict={
    'age': 'int8', 
    'sex': 'object', 
    'county': 'object', 
    'lat': 'float32', 
    'long': 'float32', 
    'name': 'object'
}
        
df=pd.read_csv('./data/uk_pop.csv', dtype=dtype_dict)
df.head()

Unnamed: 0,age,sex,county,lat,long,name
0,0,m,DARLINGTON,54.533638,-1.5244,FRANCIS
1,0,m,DARLINGTON,54.426254,-1.465314,EDWARD
2,0,m,DARLINGTON,54.555199,-1.496417,TEDDY
3,0,m,DARLINGTON,54.547909,-1.572342,ANGUS
4,0,m,DARLINGTON,54.477638,-1.605995,CHARLIE


When importing data, it is important to only include columns that are relevant to reduce the memory and compute burden. 

Below we read in the county centroid data. 

In [3]:
centroid_df=pd.read_csv('county_centroid.csv')
centroid_df.columns=['county', 'lat_county_center', 'long_county_center']
centroid_df.head()

FileNotFoundError: [Errno 2] No such file or directory: 'county_centroid.csv'

In [None]:
%%cudf.pandas.line_profile
combined_df=df.merge(centroid_df, on='county')

### Transform ###
During the **Transform** step, the extract data is cleaned, validated, and converted into a suitable format for analysis. 

Below we add a new column, representing each persons's distance from their respective county center. 

In [None]:
%%cudf.pandas.line_profile
c=['lat', 'long']
combined_df['R']=((combined_df[c] - combined_df.groupby('county')[c].transform('mean')) ** 2).sum(axis=1) ** 0.5

Using joins to get lookup values can be faster than deriving those. It is not uncommon to store group statistics for this purpose. 

In [None]:
%%cudf.pandas.line_profile

# read in centroid data
centroid_df=pd.read_csv('county_centroid.csv')

# merge 
combined_df=df.merge(centroid_df, on='county', suffixes=['', '_county_center'])

# calculate distance from county center
combined_df['R']=((combined_df['lat']-combined_df['lat_county_center'])**2+(combined_df['long']-combined_df['long_county_center'])**2)**0.5

Below we filter the data to only include adults. 

In [None]:
%%cudf.pandas.line_profile

senior_df_filter=combined_df['age'] >= 60
senior_df=combined_df.loc[senior_df_filter]

display(senior_df.head())

### Load ###
The final **Load** step is where the transformed data is loaded into a target system. The target system can be a database or a file. The key is to develop a system that is efficient for downstream tasks. 

In [None]:
senior_df.head()

In [None]:
# DO NOT CHANGE THIS CELL
senior_df.to_csv('senior_df.csv', index=False)

**Note**: If the downstream task involves querying and analyzing the data further, the csv file format may not be the best choice. 

<a name='s1-6'></a>
## Save to Parquet Format ##
After processing the data, we persist it for later use. [Apache Parquet](https://parquet.apache.org/) is a columnar binary format and has become the de-facto standard for the storage of large volumes of tabular data. Converting to Parquet file format is important and csv files should generally be avoided in data products. While the csv file format is convenient and human-readable, importing csv files requires reading and parsing entire records, which can be a bottleneck. In fact, many developers will start their analysis by first converting csv files to the Parquet file format. There are many reasons to use Parquet format for analytics: 
* The columnar nature of Parquet files allows for column pruning, which often yields big query performance gains. 
* It uses metadata to store the schema and supports more advanced data types such as categorical, datetimes, and more. This means that importing data would not require schema inference or manual schema specification. 
* It captures metadata related to row-group level statistics for each column. This enables predicate pushdown filtering, which is a form of query pushdown that allows computations to happen at the “database layer” instead of the “execution engine layer”. In this case, the database layer is Parquet files in a filesystem, and the execution engine is Dask. 
* It supports flexible compression options, making it more compact to store and more portable than a database. 

We will use `.to_parquet(path)`[[doc]](https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html#dask-dataframe-to-parquet) to write to Parquet files. By default, files will be created in the specified output directory using the convention `part.0.parquet`, `part.1.parquet`, `part.2.parquet`, ... and so on for each partition in the DataFrame. This can be changed using the `name_function` parameter. Ouputting multiple files lets Dask write to multiple files in parallel, which is faster than writing to a single file. 

<p><img src='images/parquet.png' width=240></p>

When working with large datasets, decoding and encoding is often an expensive task. This challenge tends to compound as the data size grows. A common pattern in data science is to subset the dataset by columns, row slices, or both. Moving these filtering operations to the read phase of the workflow can: 1) reduce I/O time, and 2) reduce the amount of memory required, which is important for GPUs where memory can be a limiting factor. Parquet file format enables filtered reading through **column pruning** and **statistic-based predicate filtering** to skip portions of the data that are irrelevant to the problem. Below are some tips for writing Parquet files: 
* When writing data, sorting the data by the columns that expect the most filters to be applied or columns with the highest cardinality can lead to meaningful performance benefits. The metadata calculated for each row group will enable predicate pushdown filters to the fullest extent. 
* Writing Parquet format, which requires reprocessing entire data sets, can be expensive. The format works remarkably well for read-intensive applications and low latency data storage and retrieval. 
* Partitions in Dask DataFrame can write out files in parallel, so multiple Parquet files are written simultaneously.

Below we write the data into Parquet format, after sorting by the county. 

In [None]:
# DO NOT CHANGE THIS CELL
senior_df=senior_df.sort_values('county')

senior_df.to_parquet('senior_df.parquet', index=False)

In [None]:
# DO NOT CHANGE THIS CELL
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Reading from Parquet ###
Querying data in Parquet format can be significantly more performant, especially as the size of the data increases. 

Below we read from both the csv format and Parquet format for comparison. 

In [None]:
%load_ext cudf.pandas
import pandas as pd
import time

In [None]:
%%cudf.pandas.line_profile

sel=[('county', '=', 'BLACKPOOL')]
parquet_df=pd.read_parquet('senior_df.parquet', columns=['age', 'sex', 'county', 'lat', 'long', 'name', 'R'], filters=sel)
parquet_df=parquet_df.loc[parquet_df['county']=='BLACKPOOL']

In [None]:
parquet_df['county'].unique()

In [None]:
%%cudf.pandas.line_profile

df=pd.read_csv('./senior_df.csv', usecols=['age', 'sex', 'county', 'lat', 'long', 'name', 'R'])
df=df.loc[df['county']=='BLACKPOOL']

In [None]:
df['county'].unique()

## Accelerated ETL for Downstream Tasks ##
Accelerating the ETL process is important for data science as it provides the below benefits: 
* **Timely insights**: Faster ETL allows for more up-to-date data analysis, enabling data scientists to work with the most current information.
* **Increased productivity**: Reduced processing time means data scientists can spend more time on analysis and model development rather than waiting for data to be ready.
* **Handling larger datasets**: Accelerated ETL processes can manage larger volumes of data more efficiently.
* **Cost efficiency**: Accelerated ETL can reduce computational resources and time, leading to lower infrastructure costs.

<p><img src='images/etl.png' width=720></p>

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

**Well Done!** Let's move to the [next notebook](1-08_dask-cudf.ipynb). 

<img src="./images/DLI_Header.png" width=400/>