© Raju Kumar Mishra and Sundar Rajan Raman 2019
Raju Kumar Mishra and Sundar Rajan RamanPySpark SQL Recipeshttps://doi.org/10.1007/978-1-4842-4335-0_6

6. SQL, NoSQL, and PySparkSQL

Raju Kumar Mishra1  and Sundar Rajan Raman2
(1)
Bangalore, Karnataka, India
(2)
Chennai, Tamil Nadu, India
 

In this chapter, we will look into various Spark SQL recipes that come in handy when you have to apply SQL-like queries to the data. One of the specialties of Apache Spark is the way in which it lets the user apply data-wrangling methods in a programmatic manner and as ANSI SQL-like methods. For readers who are from pure SQL backgrounds, with a little exposure to programmatic data manipulation, these SQLs are one stop shop. Almost all of the Spark programmatic APIs can be applied to data using Spark SQLs. Also these SQLs are in ANSI standard, which enables anyone currently on SQL technologies to readily start working on Apache Spark-based Big Data projects.

Once we get the DataFrame created, we can directly start applying the SQLs onto the DataFrame object and be able to manipulate the data as we need to. In this chapter, starting from simple SQLs, we look into various SQL applications in Apache Spark.

We are going to learn the following recipes, which will take you step-by-step from a beginner to an advanced Spark SQL user. We recommend you practice each of these to get the best out of this chapter.
  • Recipe 6-1. Create a DataFrame from a CSV file

  • Recipe 6-2. Create a temp table from a DataFrame

  • Recipe 6-3. Create a simple SQL from a DataFrame

  • Recipe 6-4. Apply Spark UDF methods on Spark SQL

  • Recipe 6-5. Create a new PySpark UDF

  • Recipe 6-6. Join two DataFrames using SQL

  • Recipe 6-7. Join multiple DataFrames using SQL

Recipe 6-1. Create a DataFrame from a CSV File

Problem

You want to create a DataFrame from a CSV file.

Solution

For the sake of simplicity, we are going to use very simple data so that we can focus on the logic and Spark APIs without any questions or distractions from the data. We are using the well known student data shown in Table 6-1. There are three columns in this data—studentid, name, and gender.
Table 6-1

Sample Data

studentId

name

gender

si1

Robin

M

si2

Maria

F

si3

Julie

F

si4

Bob

M

si6

William

M

The assignment is to create a DataFrame from this data.

Although you would have already performed the following steps in the previous chapters to create a DataFrame from the CSV, we start with this for continuity purposes. Also, anyone with a pure SQL background starting directly with this chapter will be lost without this step. So let’s get started:
>>> studentDf = spark.read.csv('studentData.csv',
...                           header=True, inferSchema=True)
>>> studentDf.show()
+---------+-------+-------+
|studentId|   name| gender|
+---------+-------+-------+
|      si1|  Robin|      M|
|      si2|  Maria|      F|
|      si3|  Julie|      F|
|      si4|    Bob|      M|
|      si6|William|      M|
+---------+-------+-------+

We have successfully converted the data into a Spark DataFrame. You might notice that this is very simple. Yes, Spark’s abstractions API makes it very simple for users to load data into a DataFrame.

How It Works

Let’s do a deep dive into the code that created the DataFrame. spark.read.csv is the Spark API that provides you with the feature to read CSV files. It returns a DataFrame as the output. You can also see there are two more arguments provided along with the filename—inferSchema and header. These parameters tell Spark that the schema that needs to be tied to the DataFrame should be obtained from the header row.

To make sure we create the correct schema as expected, we can use the printSchema() method of DataFrame as shown here.
>>> studentDf.printSchema()
root
 |-- studentId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)

Executing studentDf.printSchema() will provide us with the column names of the DataFrame.

Step 6-1-1. Creating a DataFrame from JSON

Very often in real-time projects we get data files in the JSON format. JSON is one of the highly used data formats. So let’s create a DataFrame from a JSON file. For the sake of simplicity, we use the same students data in the JSON format to create the DataFrame. The JSON-formatted data is given here. The attributes in a JSON data file will be placed into individual columns in the DataFrame. Each document in a DataFrame will be represented as a row in the DataFrame. It is very easy to create manual errors when JSON data is created manually. If there are any errors, there will not be pinpoint error messages given out. In that case, it becomes very hard to debug such data issues. To avoid all such scenarios it is always best to create the JSON data programmatically.

Figure 6-1 shows the students data in JSON format. There are five documents, each containing the same set of attributes.
../images/469054_1_En_6_Chapter/469054_1_En_6_Fig1_HTML.jpg
Figure 6-1

Students data in JSON format

Let’s complete the creation of the DataFrame from the JSON file:
>>> studentDf_json = spark.read.csv('students.json')
>>> studentDf.printSchema()
Here is the output showing the schema of the DataFrame. Note that this is the exact schema we got when we created the DataFrame from the CSV file.
root
 |-- studentId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
Let’s see the contents of the DataFrame using the show method.
>>> studentDf.show()
Here is the output:
+---------+-------+--------+
|studentId|   name|  gender|
+---------+-------+--------+
|      si1|  Robin|       M|
|      si2|  Maria|       F|
|      si3|  Julie|       F|
|      si4|    Bob|       M|
|      si6|William|       M|
+---------+-------+--------+

Step 6-1-2. Creating a DataFrame from Parquet

So far we have looked into CSV and JSON files. When it comes to the Big Data space where huge volumes of data are processed, the Parquet format is rated very well because of its support for columnar storage. Parquet comes with a highly efficient compression mechanism as well as encoding schemes. Parquet is known for improving query performance.

Although we saw the CSV and JSON files, we will not be showing the Parquet file since it is not human friendly to read. So let’s get straight into creating the DataFrame from the Parquet file.

Here we have a Parquet file named studentsData.parquet that contains the same students data that was loaded into the CSV and JSON formats.
>>> studentdf_parquet = spark.read.parquet('studentsData.parquet')
Let’s see the schema of the DataFrame.
>>> studentdf_parquet.printSchema()
Here is the output. You can see that this is exactly the same schema we saw for the CSV and JSON formats.
root
 |-- studentId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
Let’s see the contents of the DataFrame using the show method.
>>> studentdf_parquet.show()
Here is the output:
+---------+-------+------+
|studentId|   name|gender|
+---------+-------+------+
|      si1|  Robin|     M|
|      si2|  Maria|     F|
|      si3|  Julie|     F|
|      si4|    Bob|     M|
|      si6|William|     M|
+---------+-------+------+

At this stage, we have the DataFrame that is very essential to apply the SQL queries. The beauty of Spark is to realize how it can abstract data from various formats into its one DataFrame abstraction. This also means that the data munging applied to the DataFrame is agnostic to the file format. For example, let’s assume you have completed your data programming for the JSON formatted data and you want to change it to Parquet. There is no code change you need to do except for changing the data format API while reading. Other than that, the program written for JSON remains the same for Parquet.

Recipe 6-2. Create a Temp View from a DataFrame

Problem

While creating a DataFrame is the first step in starting to use Spark SQL, the second step is to register the created DataFrame as a temporary table in Spark’s SparkSession. For those who are wondering what a SparkSession is, it is the entry point for working with structured data such as rows and columns. Spark provides an API called createOrReplaceTempView that lets you register your DataFrame as queryable entities.

Solution

This temp view will be represented as an in-memory entity that can be used in analytics and other repetitive operations on the same data.

How It Works

Let’s create the temporary view.
>>> studentDf.createOrReplaceTempView("Students")
Executing this statement will create a temporary view called Students. Let’s go ahead and verify this.
>>> spark.tableNames()
Here is the output:
['students']

Here you can verify that the students view has been registered with SparkSession and is now readily available to be used within SQL queries. You might be wondering how long this view will be available for use. For example, if we exit the Spark Session and log in again, will this view still be available and so on. They will not, as these views are scoped only for the session in which they are created. When we log off from the Spark session and log in again, the students view we created will not be available.

Let’s verify that here:
>>> quit()
We quit from the session and log in again.
>>> from pyspark import SQLContext
>>> sqlContext = SQLContext(sc)
With these statements, we are importing the SQLContext and then instantiating SQLContext using SparkContext that is available by default using the sc variable name.
>>> sqlContext.tableNames()
Here is the output that shows that there are no existing views in the session.
[]

Recipe 6-3. Create a Simple SQL from a DataFrame

Problem

So far we have been setting up the base to start using Spark SQL. We created a DataFrame from data files and registered that DataFrame as a temporary view. Now we use the temporary view to apply our first simple SQL query.

Solution

We have to create a simple query. A simple query for anyone would mean select * from table. Let’s do that here.
>>> outputDf = spark.sql("select * from students")
>>> outputDf.show()
Here is the output:
+---------+-------+------+
|studentId|   name|gender|
+---------+-------+------+
|      si1|  Robin|     M|
|      si2|  Maria|     F|
|      si3|  Julie|     F|
|      si4|    Bob|     M|
|      si6|William|     M|
+---------+-------+------+

Notice that we are able to query on the data using a SQL query. The output is the same as what we saw earlier.

How It Works

Step 6-3-1. Using Column Names in Spark SQL

Now that we applied a simple SQL query, let’s start using the column names inside the query.

Let’s look at the column names that are available inside the students view. Recall that the printSchema method of the DataFrame can be used to display the schema. For SQL lovers, there is another way to view and analyze details.

Spark provides Describe, which can be used to see the table/view’s column, datatypes, and any comments, which usually indicate whether the column accepts null.

Let’s apply the Describe keyword on the students view.
>>> spark.sql("Describe students").show()
Here is the output:
+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|studentId|   string|   null|
|     name|   string|   null|
|   gender|   string|   null|
+---------+---------+-------+

From the output, we can see that the column names are the same as the names we saw from the printSchema method. The Describe option provides a SQL style way of getting the table schema.

Let’s go ahead and apply a query using these columns. Let’s get a student report with name and gender only, since for reporting usually we will not require the ID.

The query we are going to run will be “Get a report of all the students with their name and gender.”
>>> spark.sql("select name,gender from students").show().
Here is the output:
+-------+------+
|   name|gender|
+-------+------+
|  Robin|     M|
|  Maria|     F|
|  Julie|     F|
|    Bob|     M|
|William|     M|
+-------+------+

As per the output, we can see that only the name and gender columns are displayed.

Step 6-3-2. Creating an Alias for Column Names

From a reporting perspective, we want the report to provide formatted column names and easily understandable column names. Let’s assume that our report needs these formats:
"name" should be displayed as "Name" with N capitals.
gender should be displayed as "Sex"
To do this, Spark lets us create aliases that can be used to display the column names the way we need them to be.
>>> spark.sql("select name as Name,gender as Sex from students").show()
Here is the output:
+-------+----+
|   Name| Sex|
+-------+----+
|  Robin|   M|
|  Maria|   F|
|  Julie|   F|
|    Bob|   M|
|William|   M|
+-------+----+

Upon analyzing the output, we see that the column names are displayed as expected. To be able to create an alias, we need to keep the alias name next to the original column name. The keyword as is optional. This aliasing concept will be very helpful in complex queries, which we will see later in this chapter.

Step 6-3-3. Filtering Data Using a Where Clause

In this recipe, we are going to apply filters on the data to select only female students, ones with gender set to F. The where clause is used to apply filtering on the column values.
>>> spark.sql("select name ,gender from students where gender = 'F'").show()
Let’s see the output here:
+-------+-------+
|   name| gender|
+-------+-------+
|  Maria|      F|
|  Julie|      F|
+-------+-------+

As you can observe from the output, we get only the female students. At this moment it is important to note that Spark is very efficient in handling filter criteria. If there are multiple filter criteria applied in the same DataFrame during the course of a program at various points, Spark knows to combine that filter criteria so it can apply all of it at once. The reason for that is internally Spark will scan through each of the rows to identify the matching rows according to the filter criteria. When some of these criteria are applied on the same dataset multiple times, it becomes very costly. Spark uses a special Query Optimizer called Catalyst to be able to understand such filter criteria in this process.

While we have seen examples that work well and fine, there are some pitfalls for the readers who come from an RDBMS SQL background. So let’s look at those pitfalls next.

Step 6-3-4. Filtering on Case-Sensitive Column Values

In the previous step, the query filtered on students that were females. The female gender is represented by the column value F, capital to be noted. By chance if we are filtering based on f, then there will not be any output from the query since there are no values with f in the gender column. Let’s go ahead and filter based on the f criteria .
>>> spark.sql("select name ,gender from Students where Gender = 'f'").show()
Here is the output where we can observe that there are no records printed:
+----+------+
|name|gender|
+----+------+
+----+------+

There will be scenarios when we don’t know the column value’s case (whether the value is uppercase or lowercase). In those scenarios, we need to apply functions on the column as follows.

Let’s go ahead and use the function lower, which converts the value into lowercase before doing the comparison. In Spark SQL, this is called a UDF, or user-defined function. Spark provides a variety of predefined UDFs. lower is one such UDF that converts values from uppercase to lowercase. Let’s use the lower function and see the results.
>>> spark.sql("select name ,gender from Students where lower(gender) = 'f'").show()
Here is the output where you can see the results with the female students:
+-----+------+
| name|gender|
+-----+------+
|Maria|     F|
|Julie|     F|
+-----+------+

Step 6-3-5. Trying to Use Alias Columns in a Where Clause

Generally, many RDBMS databases allow us to use alias names in the where clause, group by clause, and other such clauses. But if we try to use the same in Spark SQL, we will see a strange error. The output says that it cannot resolve the column name. This is because Spark SQL does not allow using alias column names in the same SQL.

Let’s use an alias name in the SQL and look at the output:
>>> spark.sql("select name ,gender Sex from Students where Lower(Sex) = 'f'").show()
Here is the output:
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve '`Sex`' given input columns: [students.studentId, students.name, students.gender]; line 1 pos 44; 'Project ['name, 'gender AS Sex#163] +- 'Filter ('Sex = f) +- SubqueryAlias students +- Relation[studentId#0,name#1,gender#2] parquet "

While we might see a long stack of exceptions, do not get scared by this. At the very end, notice the friendly message mentioning that the given column cannot be resolved. This is to say that Spark SQL does not recognize alias columns in the SQL.

Recipe 6-4. Apply Spark UDF Methods on Spark SQL

Problem

You want to use Spark UDF functions in Spark SQLs.

Solution

User-defined functions (UDFs) are special functions that help in transforming data from one representation to another. In the previous section, we worked with the lower UDF to convert characters into lowercase. Other similar examples are converting date strings from one format to another, and converting a character into its detailed value, such as M into Male and F into Female.

How It Works

Let’s apply a UDF to convert a date from one format to another.

The data used in this problem contains an additional column joining the date of birth of each student. Let’s load the data and look at the contents.
>>> studentDf = spark.read.csv('studentData_dateofbirth.csv',header=True, inferSchema=True)
>>> studentDf.printSchema()
Let’s print the schema of the new data, studentData_dateofbirth.csv.
root
 |-- studentId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- dateofbirth: string (nullable = true)
The new data is loaded and, on printing the schema, we get the output shown in Figure 6-2. Note that there is an extra column now named dateofbirth.
../images/469054_1_En_6_Chapter/469054_1_En_6_Fig2_HTML.jpg
Figure 6-2

Students data with date of birth column added

We have introduced a new column called dateofbirth that lets us apply UDFs on this column. Spark by default provides a lot of date functions that can be applied on date values, such as converting a string value into a date, showing the difference between two dates, adding and subtracting dates, and so on. All these functionalities are available in Spark as user-defined functions.

Step 6-4-1. Representing a String Value as a Date Value

We need to view the default datatype of the dateofbirth column in the view. Before seeing that, we need to create a temporary view of the updated data.

Let’s go ahead and do that:
>>> studentDf.createOrReplaceTempView("Students")

With this, we created a Students view in Spark.

Let’s look at the columns and their respective datatypes in the newly created table.
>>> spark.sql("Describe students").show()
Here is the output:
+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  studentId|   string|   null|
|       name|   string|   null|
|     gender|   string|   null|
|dateofbirth|   string|   null|
+-----------+---------+-------+
Here we can notice that the dateofbirth column is represented with the string datatype. This is because Spark considers the contents of the CSV file as a string datatype. Let’s use Spark’s Date UDFs to convert this column to the Date datatype .
>>> spark.sql("select studentid, name, gender, to_date(dateofbirth) from students").show()
Here is the output:
+----------+-------+-------+--------------------------------+
| studentid|   name| gender| to_date(students.`dateofbirth`)|
+----------+-------+-------+--------------------------------+
|       si1|  Robin|      M|                      1981-12-13|
|       si2|  Maria|      F|                      1986-06-06|
|       si3|  Julie|      F|                      1988-09-05|
|       si4|    Bob|      M|                      1987-05-04|
|       si6|William|      M|                      1980-11-12|
+----------+-------+-------+--------------------------------+
Notice from the output that the dateofbirth column was given as the input to the to_date UDF, which returns the value with Date as the datatypes. Let’s verify the datatype of that column.
>>> studentDf_dob = spark.sql("select studentid, name, gender, to_date(dateofbirth) from students")
>>> studentDf_dob.createOrReplaceTempView("StudentsDob")

Here we are creating another table that will contain the converted dateofbirth column.

Let’s view the newly created table and its schema using the describe keyword.
>>> spark.sql("Describe studentsDob").show()
Here is the output:
+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|           studentid|   string|   null|
|                name|   string|   null|
|              gender|   string|   null|
|to_date(students....|     date|   null|
+--------------------+---------+-------+

You can clearly notice that there is a column with the datatype set to date. But we still do not have a clear column name that can be easily understood. Let’s use aliasing to create a better column name.

Here we are following the same steps, except we are applying the aliasing to the dateofbirth column .
>>> studentDf_dob = spark.sql("select studentid, name, gender, to_date(dateofbirth) dateOfBirth from students")
>>> studentDf_dob.createOrReplaceTempView("StudentsDob")
>>> spark.sql("Describe studentsDob").show()
Here is the output:
+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  studentid|   string|   null|
|       name|   string|   null|
|     gender|   string|   null|
|dateOfBirth|     date|   null|
+-----------+---------+-------+

The schema of the newly created table is now easy to use with the well-defined column name dateOfBirth .

Step 6-4-2. Using Date Functions for Various Date Manipulations

In this step we are going to separate the date, month, and year values using various Date user-defined functions. For ease of understanding, we picked up examples that every reader will be able to appreciate.

Let’s start by defining the SQL that separates the individual values of a Date column.
>>> spark.sql("select dateofbirth,dayofmonth(dateofbirth) day, month(dateofbirth) month, year(dateofbirth) year from  studentsdob").show()
The previous query extracts the individual values as shown here:
Date using dayofmonth
Month using month
Year using year
Here is the output:
+-----------+---+-----+----+
|dateofbirth|day|month|year|
+-----------+---+-----+----+
| 1981-12-13| 13|   12|1981|
| 1986-06-06|  6|    6|1986|
| 1988-09-05|  5|    9|1988|
| 1987-05-04|  4|    5|1987|
| 1980-11-12| 12|   11|1980|
+-----------+---+-----+----+

It can be clearly seen that these user-defined functions are very valuable since they enable us to operate on individual columns and process the values.

User-defined functions are nothing but internal programs defined inside Spark using the Java, Scala, or Python languages. These functions take the given column as input values and apply the functionality of each row in that table. For example, the year UDF takes the dateofbirth column value, gets the year part of the date, and returns the year value of the date. This will be applied to each row in the table. Similarly, month and dayofmonth are UDFs defined internally within Spark SQL and they return the month and date values, respectively.

In this section, we saw many date-related UDFs. Similar to these, there are multiple string-related functions as well as statistical functions. All these UDFs can be used in Spark SQL queries.

You might wonder what if you want to apply functions that need to aggregate column values.

Recipe 6-5. Create a New PySpark UDF

Problem

You need to create a new, custom UDF. The gender column of the Students data contains an M or F to represent male or female, respectively. You need to create a UDF that will return Male or Female as values instead of M or F.

Solution

Let’s begin creating the UDF. It is very interesting to add features that can be used in SQL queries.

How It Works

Step 6-5-1. Creating a Regular Python Function That Accepts a Value and Returns the Required Value

While creating a Python function requires a little bit of Python knowledge, it is not very complicated. So let’s create one. It is very important to provide a meaningful name for the function.
>>> def genderCodeToValue(code):
...     return('FEMALE' if code == 'F' else 'MALE' if code == 'M' else 'NA')

Here we have created a simple function that accepts a code and return its respective value. If the code is not M or F, it returns NA as the value. I have kept it very simple for you to understand the concept.

Let’s test the function by providing some values.

Here are the test results.
>>> print(genderCodeToValue('M'))
Testing with code as 'M'
Here is the output:
MALE
Testing with code 'F'.
>>> print(genderCodeToValue('F'))
Here is the output:
FEMALE
Testing with code 'K as value not either 'M' or 'F'
>>> print(genderCodeToValue('K'))
Here is the output:
NA

So we have successfully tested this simple function. It is always good to test your functions upfront before applying them on the dataset. This is called unit testing and it helps you identify and fix errors in your code immediately after you write it. Imagine if you were using the code directly on a huge set of data; you may not be able to see the errors or their root causes. Especially since Spark is typically used on huge volumes of data, errors in UDFs are very hard to find. It is very important to test the Python function with all possible inputs before trying it on the data.

Step 6-5-2. Registering the Python Function in a Spark SQL Session

After creating the Python function, the nest step is to register the function as a UDF in the Spark SQL. This is achieved as follows.
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import StringType

Before registering the genderCodeToValue as a UDF, you need to import these types into Python. First you need to import the udf function from the pyspark.sql.functions package and then you need to import StringType from the pyspark.sql.types package. StringType is needed for Spark to verify that the SQL is intact with the return type of the user-defined function.

Let’s now register the UDF.
>>> spark.udf.register("genderCodeToValue", genderCodeToValue, StringType())
Here is the output indicating that the UDF has been successfully registered with Spark SQL:
<function genderCodeToValue at 0x0000020C92E2A620>

This is the easiest way to register UDFs.

This method of registration uses the spark.udf.register function. Let’s look at each of the arguments passed in this function.

The first one is genderCodeToValue. It is the name of UDF. This is the name that will be used in the Spark SQL to call this function.

The second one is genderCodeToValue. This is the method we just created that implements the required functionality. Also note that this is a reference to the method. In this case, the method is passed as a reference. It is very important at this point that any issues that will be encountered while this UDF is invoked will be exposed only when the function is applied. That is when Spark executes an action on the DataFrame. It is very important to make sure that the function is tested for all the kinds of data that you will operate on.

Also it is good practice to properly handle errors within UDFs. Many times, UDFs are tested on smaller datasets and developers are happy that the functions work perfectly. But in the typical Big Data world where the data is error prone, it is very hard to identify when the flow failed and what data caused the error. When millions of rows being processed using UDFs are stopped because of one wrong issue that was not handled properly, this is a nightmare in production environments.

The third function is StringType(). This indicates that the UDF will return a string value. Spark SQL uses this return type to properly validate and apply the UDF.

Step 6-5-3. Calling the Registered UDF from PySpark SQL

Now we are going to use the UDF genderCodeToValue , which we just created in Spark SQL. Let’s get started with it.

Let’s assume we need to get a student report with the students’ names and their expanded gender. Here is the query:
>>> spark.sql("select name,  genderCodeToValue(gender) from  studentsdob").show()
Here is the output:
+-------+-------------------------+
|   name|genderCodeToValue(gender)|
+-------+-------------------------+
|  Robin|                     MALE|
|  Maria|                   FEMALE|
|  Julie|                   FEMALE|
|    Bob|                     MALE|
|William|                     MALE|
+-------+-------------------------+
You can clearly see that the gender is shown with the output from the UDF. The column heading is not very legible, so let’s correct that using the following alias:
>>> spark.sql("select name as Name,  genderCodeToValue(gender) as Gender from  studentsdob").show()
Here is the output:
+-------+------+
|   Name|Gender|
+-------+------+
|  Robin|  MALE|
|  Maria|FEMALE|
|  Julie|FEMALE|
|    Bob|  MALE|
|William|  MALE|
+-------+------+

Note that we have cleaner output with the proper column alias names. With this, we have successfully completed the assignment of creating a custom UDF.

Recipe 6-6. Join Two DataFrames Using SQL

Problem

You want to join two DataFrames using Spark SQL. Joining two or more datasets is a very important step in any data transformation process. Often as ETL developers we end up joining multiple tables to get the desired output report. In a typical data warehouse, either the star schema or the snowflake schema data modeling technique is followed. In either of these data modeling techniques, lots of joins are involved. With Big Data, it is preferred to have denormalized data models since joining huge datasets will slow down the performance of the queries. Nevertheless, we have to clearly understand how to join datasets and get the reports.

Solution

Let’s start by joining two datasets. For this purpose, we are going to introduce you to another student-based dataset, called Subjects. See Figure 6-3.
../images/469054_1_En_6_Chapter/469054_1_En_6_Fig3_HTML.jpg
Figure 6-3

The subjects dataset

We are going to get the marks obtained by the students in individual subjects.

For that we need to first load this data as a DataFrame. At this stage, since we have performed this multiple times, it would be a good exercise if you could do this without referring to the following code.

How It Works

Let’s get into the code. We are going to create a temporary view from the subjects data.
>>> subjectsDf = spark.read.csv('subjects.csv',header=True, inferSchema=True)
>>> subjectsDf.createOrReplaceTempView("subjects")
>>> spark.sql("select * from subjects").show()

Yes, it takes only these three lines of Spark code to load data from CSV and be able to query from it using SQL. It is this specialty of Spark that makes it great. Using any other language or tools, it will take quite a lot of boilerplate code to be able to start querying this data based on SQL.

Here is the output:
+---------+-------+-----+
|studentId|subject|marks|
+---------+-------+-----+
|      si1| Python|   75|
|      si3|   Java|   76|
|      si1|   Java|   81|
|      si2| Python|   85|
|      si3|   Ruby|   72|
|      si4|    C++|   78|
|      si5|      C|   77|
|      si4| Python|   84|
|      si2|   Java|   83|
+---------+-------+-----+
Let’s all see the student data:
>>> spark.sql("select studentId,name,gender  from studentsdob").show()
+---------+-------+------+
|studentId|   name|gender|
+---------+-------+------+
|      si1|  Robin|     M|
|      si2|  Maria|     F|
|      si3|  Julie|     F|
|      si4|    Bob|     M|
|      si6|William|     M|
+---------+-------+------+
By observing both these datasets, note that studentId is the column that can be used to join these two datasets. Let’s write the query to join these datasets.
>>> spark.sql("select * from studentsdob st join subjects sb on (st.studentId = sb.studentId)").show()
Running this query produces the following results:
+---------+-----+------+-------------------+----------+-------+------+
|studentId| name|gender|        dateofbirth| studentId|subject| marks|
+---------+-----+------+-------------------+----------+-------+------+
|      si1|Robin|     M|1981-12-13 00:00:00|       si1|   Java|    81|
|      si1|Robin|     M|1981-12-13 00:00:00|       si1| Python|    75|
|      si2|Maria|     F|1986-06-06 00:00:00|       si2|   Java|    83|
|      si2|Maria|     F|1986-06-06 00:00:00|       si2| Python|    85|
|      si3|Julie|     F|1988-09-05 00:00:00|       si3|   Ruby|    72|
|      si3|Julie|     F|1988-09-05 00:00:00|       si3|   Java|    76|
|      si4|  Bob|     M|1987-05-04 00:00:00|       si4| Python|    84|
|      si4|  Bob|     M|1987-05-04 00:00:00|       si4|    C++|    78|
+---------+-----+------+-------------------+----------+-------+------+
Before getting into the observations, let’s try to understand the query. You can observe from the query
select * from studentsdob st join subjects sb on (st.studentId = sb.studentId)
That there is a new keyword called join that’s used between the studentsdob and subjects tables. We are aliasing the tables with names. There is an on clause at the end in which the common column is being compared. We will go into each of these join query specifics.
  • Joins: Tables are joined by applying the join keyword between the tables. There are multiple kinds of joins that Spark SQL supports. The most traditional and most used are the inner join, the left outer join, and the right outer join. By default, when the join keyword is provided, Spark performs an inner join. For an inner join, Spark compares the column values of each row of the tables on both the sides of the given column in the query. Only the rows that satisfy the criteria will be selected. The left outer join selects all the rows from the left table and rows that are not matching from the right side tables will be marked as null. The right outer join selects all rows from the right and rows not matching from the left table will be marked as null.

  • Table alias: Previously in this chapter we looked into aliasing columns. Similarly tables can also be provided an alias name. This will be helpful when the join column name is the same in both tables. The following query, which does not use alias names, gets an error due to its ambiguous column names:

>>> spark.sql("select * from studentsdob  join subjects on (studentId = studentId)").show()
pyspark.sql.utils.AnalysisException: "Reference 'studentId' is ambiguous, could be: studentsdob.studentId, subjects.studentId.
You might see a huge stack trace before this error. But at the end, the error says that the studentid is ambiguous between the studentsdob and subjects tables.
  • On clause: Readers who are familiar with RDBMS SQLs will want to apply the join condition on the where clause rather on the on clause. While it is syntactically correct to compare the columns on the where clause, we will not get the correct results. This is because of how Spark achieves the join. So always apply the join condition to the On clause.

Now that we understand the query better, let’s analyze the results to understand them better as well. The results look good and we have obtained what we are looking for, but there are multiple observations we can make from this data.
  • Observation 1: Some of the students are missing from this report. For example, the student named William does not appear in this output.

  • Observation 2: There was a labeled studentId si5 that was available in the subjects dataset that is missing from this output.

Let’s explore these issues in more detail.

Observation 1

The reason we are not able to see William in the output is because William, with studentId si5, is not available in the subjects dataset. Because of this, when Spark is applying the inner join, it doesn’t find studentid si5, so it drops it from the output. If the report needs to list all the students whether they are available on the subjects dataset or not, we need to use a left join or left outer join.

Let’s go ahead and use a left join to list the output. Here is the query using a left join:
>>> spark.sql("select * from studentsdob st left join subjects sb on (st.studentId = sb.studentId)").show()
Here is the output:
+---------+-------+------+-------------------+---------+-------+-----+
|studentId|   name|gender|        dateofbirth|studentId|subject|marks|
+---------+-------+------+-------------------+---------+-------+-----+
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1|   Java|   81|
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1| Python|   75|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2|   Java|   83|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2| Python|   85|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Ruby|   72|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Java|   76|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4| Python|   84|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4|    C++|   78|
|      si6|William|     M|1980-11-12 00:00:00|     null|   null| null|
+---------+-------+------+-------------------+---------+-------+-----+

At the very end of the output, note that William’s row is available but the subject is null. This will be very useful and needed for showing scenarios when a student is absent and has not taken the test.

There often are columns that are not required for output. So, let’s show only the columns that are needed. As an example, we are going to select the studentId, name, subjects, and marks columns only for reporting purposes.

Running this query:
>>> spark.sql("select studentid,name,subject,marks from studentsdob st left join subjects sb on (st.studentId = sb.studentId)").show()
We get this output with an error:
pyspark.sql.utils.AnalysisException: "Reference 'studentid' is ambiguous, could be: st.studentid, sb.studentid.
As you will be aware by now, this happens because studentId is not qualified with an alias name. So let’s qualify the columns with alias names:
>>> spark.sql("select st.studentid,name,subject,marks from studentsdob st left join subjects sb on (st.studentId = sb.studentId)").show()
Here is the output with reduced columns:
+---------+-------+-------+-----+
|studentid|   name|subject|marks|
+---------+-------+-------+-----+
|      si1|  Robin|   Java|   81|
|      si1|  Robin| Python|   75|
|      si2|  Maria|   Java|   83|
|      si2|  Maria| Python|   85|
|      si3|  Julie|   Ruby|   72|
|      si3|  Julie|   Java|   76|
|      si4|    Bob| Python|   84|
|      si4|    Bob|    C++|   78|
|      si6|William|   null| null|
+---------+-------+-------+-----+

Observation 2

Now let’s look into the second observation, where there is a record in the subjects dataset that did not appear in the output. This could be due to multiple reasons, such as the student’s data was not properly maintained, data entry errors occurred while entering studentId values, etc. All of these kinds of errors are very common in Big Data volumes. These errors cannot be ignored since they will introduce data errors and incorrect data reporting. For these reasons, let’s try to include that value in the report using a right outer join.

The following query uses a right outer join:
>>> spark.sql("select st.studentid,name,sb.studentid,subject,marks from studentsdob st right join subjects sb on (st.studentId = sb.studentId)").show()

Note that the only difference between the left and right outer joins is replacing the word “left” with “right”.

Here is the output:
+---------+-----+---------+-------+-----+
|studentid| name|studentid|subject|marks|
+---------+-----+---------+-------+-----+
|      si1|Robin|      si1| Python|   75|
|      si3|Julie|      si3|   Java|   76|
|      si1|Robin|      si1|   Java|   81|
|      si2|Maria|      si2| Python|   85|
|      si3|Julie|      si3|   Ruby|   72|
|      si4|  Bob|      si4|    C++|   78|
|     null| null|      si5|      C|   77|
|      si4|  Bob|      si4| Python|   84|
|      si2|Maria|      si2|   Java|   83|
+---------+-----+---------+-------+-----+

Note that subject C is available in the output. But the name and studentid fields are null. That is because studentId si5 is not available in the students dataset.

Now we can show the missing subjects using a left outer join or a right outer join. But having only one subject in the output will not be useful. We need to be able to list both of the missing entities in one output. In such scenarios, we need to use a full outer join, which will match based on the join condition and will select rows from both the tables.

Let’s apply the same query with a full outer join.
>>> spark.sql("select st.studentid,name,sb.studentid,subject,marks from studentsdob st FULL OUTER JOIN subjects sb on (st.studentId = sb.studentId)").show()

Note that a full outer join is a very costly operation. Applying a full outer join on Big Data volumes will consume lots of resources and time. So it is better to avoid full outer joins in time-critical SLA-based applications.

Here is the output:
+---------+-------+---------+-------+-----+
|studentid|   name|studentid|subject|marks|
+---------+-------+---------+-------+-----+
|     null|   null|      si5|      C|   77|
|      si2|  Maria|      si2| Python|   85|
|      si2|  Maria|      si2|   Java|   83|
|      si4|    Bob|      si4|    C++|   78|
|      si4|    Bob|      si4| Python|   84|
|      si3|  Julie|      si3|   Java|   76|
|      si3|  Julie|      si3|   Ruby|   72|
|      si6|William|     null|   null| null|
|      si1|  Robin|      si1| Python|   75|
|      si1|  Robin|      si1|   Java|   81|
+---------+-------+---------+-------+-----+

Note that William and subject C are both available. Now that we are able to see these inconsistencies, we can work with the data sources to fix them.

While applying the join, we need to identify the columns with which we can apply it. It is always better if we can have numerical columns since the comparison will be faster and less error prone to any data issues. Usually when you are working with Big Data volumes, you’ll find string columns that are not very clean. For example, there may be trailing or leading spaces, case-sensitivity issues, and so on, that will make these joins fail. So you need to first cleanse these data differences and then apply the join.

With this we have successfully completed the assignment of joining two DataFrames.

Recipe 6-7. Join Multiple DataFrames Using SQL

Problem

You want to join more than two DataFrames and see the output.

Solution

For simplicity reasons, we use a simple dataset related to students. So far we have looked at students and subjects. Now we add a new dataset, called attendance. Let’s look at all three datasets together, shown in Figure 6-4.
../images/469054_1_En_6_Chapter/469054_1_En_6_Fig4_HTML.jpg
Figure 6-4

The new attendance dataset has been added

This dataset has an attendance column, which registers how many days out of 40 classes each student attended for each subject.

Now that you are aware of how to handle errors in data using left joins, right joins, and full outer joins, I have cleansed the data and simplified it.

Figures 6-5 and 6-6 show the cleansed students and the subjects data.
../images/469054_1_En_6_Chapter/469054_1_En_6_Fig5_HTML.jpg
Figure 6-5

Subjects data

../images/469054_1_En_6_Chapter/469054_1_En_6_Fig6_HTML.jpg
Figure 6-6

Students data

How It Works

To be able to start applying queries, we first need to load the attendance data. Let’s go ahead and load the attendance dataset.

Here are the steps to load the attendance dataset:
>>> attendanceDf = spark.read.csv('attendance.csv',header=True, inferSchema=True)
>>> attendanceDf.createOrReplaceTempView("attendance")
>>> spark.sql("select * from attendance").show()
Here is the output:
+---------+-------+----------+
|studentId|subject|attendance|
+---------+-------+----------+
|      si1| Python|        30|
|      si3|   Java|        22|
|      si1|   Java|        34|
|      si2| Python|        39|
|      si3|   Ruby|        25|
|      si4|    C++|        38|
|      si5|      C|        35|
|      si4| Python|        39|
|      si2|   Java|        39|
|      si6|   Java|        35|
+---------+-------+----------+
Before going to the final report, let’s first combine all three tables to verify the joins. Here’s the query to join multiple dataframes:
>>> spark.sql("select * from studentsdob st Join subjects sb on (st.studentId = sb.studentId) Join attendance at on (at.studentId = st.studentId)").show()

This query is joining the third dataset as a continuation of the existing join. Similarly, you can join any number of tables in the same manner.

Here is the output:
+---------+-------+------+-------------------+---------+-------+-----+---------+-------+----------+
|studentId|   name|gender|        dateofbirth|studentId|subject|marks|studentId|subject|attendance|
+---------+-------+------+-------------------+---------+-------+-----+---------+-------+----------+
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1|   Java|   81|      si1|   Java|        34|
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1|   Java|   81|      si1| Python|        30|
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1| Python|   75|      si1|   Java|        34|
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1| Python|   75|      si1| Python|        30|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2|   Java|   83|      si2|   Java|        39|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2|   Java|   83|      si2| Python|        39|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2| Python|   85|      si2|   Java|        39|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2| Python|   85|      si2| Python|        39|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Ruby|   72|      si3|   Ruby|        25|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Ruby|   72|      si3|   Java|        22|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Java|   76|      si3|   Ruby|        25|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Java|   76|      si3|   Java|        22|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4| Python|   84|      si4| Python|        39|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4| Python|   84|      si4|    C++|        38|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4|    C++|   78|      si4| Python|        39|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4|    C++|   78|      si4|    C++|        38|
|      si6|William|     M|1980-11-12 00:00:00|      si6|   Java|   70|      si6|   Java|        35|
+---------+-------+------+-------------------+---------+-------+-----+---------+-------+----------+

It’s very hard to read from this output since it contains all the columns from all three tables.

Now the task is to get a cleaner report with only the required columns by combining all the datasets—student, subject, attendance—that will provide a report:

Name    Gender    Subject    Marks    Attendance

Here is all the code that is needed to build the report:
#Load and register Student data
studentDf = spark.read.csv('studentData_dateofbirth.csv', header=True, inferSchema=True)
studentDf.createOrReplaceTempView("StudentsDob")
#Load and register subjects
subjectsDf = spark.read.csv('subjects.csv',header=True, inferSchema=True)
subjectsDf.createOrReplaceTempView("subjects")
#Load and register attendance
attendanceDf = spark.read.csv('attendance.csv',header=True, inferSchema=True)
attendanceDf.createOrReplaceTempView("attendance")
#Create the gender User Defined Function
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def genderCodeToValue(code):
        return('FEMALE' if code == 'F' else 'MALE' if code == 'M' else 'NA')
spark.udf.register("genderCodeToValue", genderCodeToValue, StringType())
# Apply a query to get the final report
spark.sql("select name as Name,  genderCodeToValue(gender) as Gender, marks as Marks, attendance Attendance  from studentsdob st Join subjects sb on (st.studentId = sb.studentId) Join attendance at on (at.studentId = st.studentId)").show()
The output of this code is as follows:
+-------+------+-----+----------+
|   Name|Gender|Marks|Attendance|
+-------+------+-----+----------+
|  Robin|  MALE|   81|        34|
|  Robin|  MALE|   81|        30|
|  Robin|  MALE|   75|        34|
|  Robin|  MALE|   75|        30|
|  Maria|FEMALE|   83|        39|
|  Maria|FEMALE|   83|        39|
|  Maria|FEMALE|   85|        39|
|  Maria|FEMALE|   85|        39|
|  Julie|FEMALE|   72|        25|
|  Julie|FEMALE|   72|        22|
|  Julie|FEMALE|   76|        25|
|  Julie|FEMALE|   76|        22|
|    Bob|  MALE|   84|        39|
|    Bob|  MALE|   84|        38|
|    Bob|  MALE|   78|        39|
|    Bob|  MALE|   78|        38|
|William|  MALE|   70|        35|
+-------+------+-----+----------+
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset