In this chapter, we will cover how to do data processing and transformation with Synapse notebooks. Details on using pandas DataFrames within Synapse notebooks will be covered, which will help us to explore data that is stored as Parquet files in Azure Data Lake Storage (ADLS) Gen2 as a pandas DataFrame and then write it back to ADLS Gen2 as a Parquet file.
We will be covering the following recipes:
In this recipe, we will learn how to create an ADLS Gen2 storage account and upload data as a Parquet file, where ADLS Gen2 can be considered as the landing zone before data is processed and transformed.
We will be using a public dataset for our scenario. This dataset will consist of New York yellow taxi trip data; this includes attributes such as trip distances, itemized fares, rate types, payment types, pick-up and drop-off dates and times, driver-reported passenger counts, and pick-up and drop-off locations. We will be using this dataset throughout this recipe to demonstrate various use cases:
Let's get started.
ADLS Gen2 is a data lake solution providing capabilities to store filesystems in a hierarchical namespace and low-cost object-based storage with guaranteed high availability and disaster recovery features. The Azure Blob Filesystem (ABFS) driver provides the necessary interface for ADLS Gen2 storage. Our input dataset will be stored as a Parquet file in ADLS Gen2 inside a container and subsequently used for data standardization, processing, and transformation with Synapse notebooks.
Let's create an ADLS Gen2 storage account to start:
If you want to add multiple users to access the storage account, you must perform the same steps for each user.
We have created a storage account in ADLS Gen2, enabled a hierarchical namespace for storage, and enabled role assignment, so we can now proceed to the next step.
Let's create a container in ADLS Gen2 now.
The data is now in our ADLS Gen2 data lake, inside a container as a Parquet file.
In this recipe, we will learn how to create a Synapse Analytics workspace and create Synapse notebooks so that we can load data from an ADLS Gen2 Parquet file to a pandas DataFrame. Synapse notebooks are required for us to perform a detailed analysis of data in interactive session mode.
We will be using a public dataset for our scenario. This dataset will consist of New York yellow taxi trip data; this includes attributes such as trip distances, itemized fares, rate types, payment types, pick-up and drop-off dates and times, driver-reported passenger counts, and pick-up and drop-off locations. We will be using this dataset throughout this recipe to demonstrate various use cases:
Let's get started.
Exploring an ADLS Gen2 Parquet file in a pandas DataFrame requires us to create a Synapse Analytics workspace, a Synapse Spark pool, and a Synapse notebook. The following recipe is a step-by-step guide to using the core features of Synapse Analytics.
Synapse Analytics workspace creation requires us to create a resource group or have access to an existing resource group with owner permissions. Let's use an existing resource group, where you will find owner permissions for the user:
Synapse Spark pools are the home for all Spark resources, notebooks, and clusters. When we create a Spark pool, a Spark session is created by default. This Spark pool takes care of Spark resources that will be used by the Spark session. The user can work with the Spark pool without the need to manage clusters because the Synapse workspace takes care of this, removing the overhead for users to manage it by themselves:
The Spark pool is successfully created now, so we can proceed with notebook creation and execution.
Synapse notebooks are interactive Spark sessions and editors for the user to work on Spark code:
df = spark.read.parquet('abfss://[email protected]/NYCTripSmall.parquet')
df.show(10)
print('Converting to Pandas.')
pd = df.toPandas()
print(pd)
You can view the following output in pandas on a Synapse notebook screen:
Alternatively, we can directly read Parquet files into pandas, as follows:
Import pandas
df = pandas.read_parquet('abfss://[email protected]/NYCTripSmall.parquet')
print(df)
In this section, we will learn how to process and view data as charts with different operations of DataFrame using PySpark in Synapse notebooks. Charts are usually used to display data and help us to understand patterns between different data points. Graphs and diagrams also help to compare data.
Getting ready
We will be using a public dataset for our scenario. This dataset will consist of New York yellow taxi trip data; this includes attributes such as trip distances, itemized fares, rate types, payment types, pick-up and drop-off dates and times, driver-reported passenger counts, and pick-up and drop-off locations. We will be using this dataset throughout this recipe to demonstrate various use cases:
Let's get started:
df = spark.read.parquet('abfss://[email protected]/NYCTripSmall.parquet')
df.printSchema()
The output is shown in the following screenshot:
df.show(5, truncate=false)
Figure 5.23 shows the output:
df.select('PassengerCount', 'DateID').show(10)
The output is shown in the following screenshot:
df.groupBy("DateID").count().sort("count", ascending=True).show()
The following screenshot shows the output:
df.describe().show()
The following screenshot shows the results:
df.filter(df.TripDistanceMiles > 1.5).count()
Figure 5.27 displays the result:
from pyspark.sql.functions import *
df=df.withColumn("Longtrip", col("TripDistanceMiles"))
df.show(5)
The output is shown in the following screenshot:
df.filter(df["Longtrip"].between(1,3)).show(5)
The following screenshot shows the results:
df=df.groupby("DateID").agg({'TotalAmount':"sum"})
display(df)
The following screenshot shows the output:
Select the chart settings with the following code and change to a pie chart to get a clear picture of the data:
df=df.groupby("DateID").agg({'TotalAmount':"sum"})
display(df)
The result is shown in the following screenshot:
Apache Parquet is a columnar file format that is supported by many big data processing systems and is the most efficient file format for storing data. Most of the Hadoop and big data world uses Parquet to a large extent. The advantage is the efficient data compression support, which enhances the performance of complex data.
Spark supports both reading and writing Parquet files because it reduces the underlying data storage. Since it occupies less storage, it actually reduces I/O operations and consumes less memory.
In this section, we will learn about reading Parquet files and writing to Parquet files. Reading and writing to a Parquet file with PySpark code is straightforward.
We will be using a public dataset for our scenario. This dataset will consist of New York yellow taxi trip data; this includes attributes such as trip distances, itemized fares, rate types, payment types, pick-up and drop-off dates and times, driver-reported passenger counts, and pick-up and drop-off locations. We will be using this dataset throughout this recipe to demonstrate various use cases:
Let's get started:
The following code snippet performs the operation of reading and writing to Parquet files:
df = spark.read.parquet('abfss://[email protected]/NYCTripSmall.parquet')
from pyspark.sql.functions import *
df = df.withColumn("Longtrip", col("TripDistanceMiles"))
df.write.parquet('abfss://[email protected]/NYCTripSmallwrite.parquet')
dfwrite = spark.read.parquet('abfss://[email protected]/NYCTripSmallwrite.parquet')
dfwrite.show(5)
In this section, we will learn how to do exploratory analysis with a dataset using PySpark in Synapse notebooks.
We will be using a public dataset for our scenario. This dataset will consist of New York yellow taxi trip data; this includes attributes such as trip distances, itemized fares, rate types, payment types, pick-up and drop-off dates and times, driver-reported passenger counts, and pick-up and drop-off locations. We will be using this dataset throughout this recipe to demonstrate various use cases:
Let's get started and try to find out the busiest day of the week with the most trips:
dftrip = spark.read.parquet('abfss://[email protected]/NYCTripSmall.parquet')
from pyspark.sql import functions as F
dftrip=dftrip.withColumn("trip_date", F.to_date(F.col("DateID").cast("string"),'yyyyMMdd'))
dftrip.printSchema()
The following screenshot shows the results:
dftrip.select(col("trip_date"),
dayofweek(col("trip_date")).alias("dayofweek"),
dayofmonth(col("trip_date")).alias("dayofmonth"),
dayofyear(col("trip_date")).alias("dayofyear"),
).show()
The output is shown in the following screenshot:
import pyspark.sql.functions as f
dftrip=dftrip.withColumn('Day', f.date_format('trip_date', 'E'))
dftrip.show(5)
The following screenshot shows the result:
dftrip.groupBy("Day").count().orderBy("count").show(7)
The following screenshot shows the output:
dftrip=dftrip.agg({'Day':"count"})
display(dftrip)