In this chapter, we will look into various Spark SQL recipes that come in handy when you have to apply SQL-like queries to the data. One of the specialties of Apache Spark is the way in which it lets the user apply data-wrangling methods in a programmatic manner and as ANSI SQL-like methods. For readers who are from pure SQL backgrounds, with a little exposure to programmatic data manipulation, these SQLs are one stop shop. Almost all of the Spark programmatic APIs can be applied to data using Spark SQLs. Also these SQLs are in ANSI standard, which enables anyone currently on SQL technologies to readily start working on Apache Spark-based Big Data projects.
Once we get the DataFrame created, we can directly start applying the SQLs onto the DataFrame object and be able to manipulate the data as we need to. In this chapter, starting from simple SQLs, we look into various SQL applications in Apache Spark.
Recipe 6-1. Create a DataFrame from a CSV file
Recipe 6-2. Create a temp table from a DataFrame
Recipe 6-3. Create a simple SQL from a DataFrame
Recipe 6-4. Apply Spark UDF methods on Spark SQL
Recipe 6-5. Create a new PySpark UDF
Recipe 6-6. Join two DataFrames using SQL
Recipe 6-7. Join multiple DataFrames using SQL
Recipe 6-1. Create a DataFrame from a CSV File
Problem
You want to create a DataFrame from a CSV file.
Solution
Sample Data
studentId | name | gender |
---|---|---|
si1 | Robin | M |
si2 | Maria | F |
si3 | Julie | F |
si4 | Bob | M |
si6 | William | M |
The assignment is to create a DataFrame from this data.
We have successfully converted the data into a Spark DataFrame. You might notice that this is very simple. Yes, Spark’s abstractions API makes it very simple for users to load data into a DataFrame.
How It Works
Let’s do a deep dive into the code that created the DataFrame. spark.read.csv is the Spark API that provides you with the feature to read CSV files. It returns a DataFrame as the output. You can also see there are two more arguments provided along with the filename—inferSchema and header. These parameters tell Spark that the schema that needs to be tied to the DataFrame should be obtained from the header row.
Executing studentDf.printSchema() will provide us with the column names of the DataFrame.
Step 6-1-1. Creating a DataFrame from JSON
Very often in real-time projects we get data files in the JSON format. JSON is one of the highly used data formats. So let’s create a DataFrame from a JSON file. For the sake of simplicity, we use the same students data in the JSON format to create the DataFrame. The JSON-formatted data is given here. The attributes in a JSON data file will be placed into individual columns in the DataFrame. Each document in a DataFrame will be represented as a row in the DataFrame. It is very easy to create manual errors when JSON data is created manually. If there are any errors, there will not be pinpoint error messages given out. In that case, it becomes very hard to debug such data issues. To avoid all such scenarios it is always best to create the JSON data programmatically.
Step 6-1-2. Creating a DataFrame from Parquet
So far we have looked into CSV and JSON files. When it comes to the Big Data space where huge volumes of data are processed, the Parquet format is rated very well because of its support for columnar storage. Parquet comes with a highly efficient compression mechanism as well as encoding schemes. Parquet is known for improving query performance.
Although we saw the CSV and JSON files, we will not be showing the Parquet file since it is not human friendly to read. So let’s get straight into creating the DataFrame from the Parquet file.
At this stage, we have the DataFrame that is very essential to apply the SQL queries. The beauty of Spark is to realize how it can abstract data from various formats into its one DataFrame abstraction. This also means that the data munging applied to the DataFrame is agnostic to the file format. For example, let’s assume you have completed your data programming for the JSON formatted data and you want to change it to Parquet. There is no code change you need to do except for changing the data format API while reading. Other than that, the program written for JSON remains the same for Parquet.
Recipe 6-2. Create a Temp View from a DataFrame
Problem
While creating a DataFrame is the first step in starting to use Spark SQL, the second step is to register the created DataFrame as a temporary table in Spark’s SparkSession. For those who are wondering what a SparkSession is, it is the entry point for working with structured data such as rows and columns. Spark provides an API called createOrReplaceTempView that lets you register your DataFrame as queryable entities.
Solution
This temp view will be represented as an in-memory entity that can be used in analytics and other repetitive operations on the same data.
How It Works
Here you can verify that the students view has been registered with SparkSession and is now readily available to be used within SQL queries. You might be wondering how long this view will be available for use. For example, if we exit the Spark Session and log in again, will this view still be available and so on. They will not, as these views are scoped only for the session in which they are created. When we log off from the Spark session and log in again, the students view we created will not be available.
Recipe 6-3. Create a Simple SQL from a DataFrame
Problem
So far we have been setting up the base to start using Spark SQL. We created a DataFrame from data files and registered that DataFrame as a temporary view. Now we use the temporary view to apply our first simple SQL query.
Solution
Notice that we are able to query on the data using a SQL query. The output is the same as what we saw earlier.
How It Works
Step 6-3-1. Using Column Names in Spark SQL
Now that we applied a simple SQL query, let’s start using the column names inside the query.
Let’s look at the column names that are available inside the students view. Recall that the printSchema method of the DataFrame can be used to display the schema. For SQL lovers, there is another way to view and analyze details.
Spark provides Describe, which can be used to see the table/view’s column, datatypes, and any comments, which usually indicate whether the column accepts null.
From the output, we can see that the column names are the same as the names we saw from the printSchema method. The Describe option provides a SQL style way of getting the table schema.
Let’s go ahead and apply a query using these columns. Let’s get a student report with name and gender only, since for reporting usually we will not require the ID.
As per the output, we can see that only the name and gender columns are displayed.
Step 6-3-2. Creating an Alias for Column Names
Upon analyzing the output, we see that the column names are displayed as expected. To be able to create an alias, we need to keep the alias name next to the original column name. The keyword as is optional. This aliasing concept will be very helpful in complex queries, which we will see later in this chapter.
Step 6-3-3. Filtering Data Using a Where Clause
As you can observe from the output, we get only the female students. At this moment it is important to note that Spark is very efficient in handling filter criteria. If there are multiple filter criteria applied in the same DataFrame during the course of a program at various points, Spark knows to combine that filter criteria so it can apply all of it at once. The reason for that is internally Spark will scan through each of the rows to identify the matching rows according to the filter criteria. When some of these criteria are applied on the same dataset multiple times, it becomes very costly. Spark uses a special Query Optimizer called Catalyst to be able to understand such filter criteria in this process.
While we have seen examples that work well and fine, there are some pitfalls for the readers who come from an RDBMS SQL background. So let’s look at those pitfalls next.
Step 6-3-4. Filtering on Case-Sensitive Column Values
There will be scenarios when we don’t know the column value’s case (whether the value is uppercase or lowercase). In those scenarios, we need to apply functions on the column as follows.
Step 6-3-5. Trying to Use Alias Columns in a Where Clause
Generally, many RDBMS databases allow us to use alias names in the where clause, group by clause, and other such clauses. But if we try to use the same in Spark SQL, we will see a strange error. The output says that it cannot resolve the column name. This is because Spark SQL does not allow using alias column names in the same SQL.
While we might see a long stack of exceptions, do not get scared by this. At the very end, notice the friendly message mentioning that the given column cannot be resolved. This is to say that Spark SQL does not recognize alias columns in the SQL.
Recipe 6-4. Apply Spark UDF Methods on Spark SQL
Problem
You want to use Spark UDF functions in Spark SQLs.
Solution
User-defined functions (UDFs) are special functions that help in transforming data from one representation to another. In the previous section, we worked with the lower UDF to convert characters into lowercase. Other similar examples are converting date strings from one format to another, and converting a character into its detailed value, such as M into Male and F into Female.
How It Works
Let’s apply a UDF to convert a date from one format to another.
We have introduced a new column called dateofbirth that lets us apply UDFs on this column. Spark by default provides a lot of date functions that can be applied on date values, such as converting a string value into a date, showing the difference between two dates, adding and subtracting dates, and so on. All these functionalities are available in Spark as user-defined functions.
Step 6-4-1. Representing a String Value as a Date Value
We need to view the default datatype of the dateofbirth column in the view. Before seeing that, we need to create a temporary view of the updated data.
With this, we created a Students view in Spark.
Here we are creating another table that will contain the converted dateofbirth column.
You can clearly notice that there is a column with the datatype set to date. But we still do not have a clear column name that can be easily understood. Let’s use aliasing to create a better column name.
The schema of the newly created table is now easy to use with the well-defined column name dateOfBirth .
Step 6-4-2. Using Date Functions for Various Date Manipulations
In this step we are going to separate the date, month, and year values using various Date user-defined functions. For ease of understanding, we picked up examples that every reader will be able to appreciate.
It can be clearly seen that these user-defined functions are very valuable since they enable us to operate on individual columns and process the values.
User-defined functions are nothing but internal programs defined inside Spark using the Java, Scala, or Python languages. These functions take the given column as input values and apply the functionality of each row in that table. For example, the year UDF takes the dateofbirth column value, gets the year part of the date, and returns the year value of the date. This will be applied to each row in the table. Similarly, month and dayofmonth are UDFs defined internally within Spark SQL and they return the month and date values, respectively.
In this section, we saw many date-related UDFs. Similar to these, there are multiple string-related functions as well as statistical functions. All these UDFs can be used in Spark SQL queries.
You might wonder what if you want to apply functions that need to aggregate column values.
Recipe 6-5. Create a New PySpark UDF
Problem
You need to create a new, custom UDF. The gender column of the Students data contains an M or F to represent male or female, respectively. You need to create a UDF that will return Male or Female as values instead of M or F.
Solution
Let’s begin creating the UDF. It is very interesting to add features that can be used in SQL queries.
How It Works
Step 6-5-1. Creating a Regular Python Function That Accepts a Value and Returns the Required Value
Here we have created a simple function that accepts a code and return its respective value. If the code is not M or F, it returns NA as the value. I have kept it very simple for you to understand the concept.
Let’s test the function by providing some values.
So we have successfully tested this simple function. It is always good to test your functions upfront before applying them on the dataset. This is called unit testing and it helps you identify and fix errors in your code immediately after you write it. Imagine if you were using the code directly on a huge set of data; you may not be able to see the errors or their root causes. Especially since Spark is typically used on huge volumes of data, errors in UDFs are very hard to find. It is very important to test the Python function with all possible inputs before trying it on the data.
Step 6-5-2. Registering the Python Function in a Spark SQL Session
Before registering the genderCodeToValue as a UDF, you need to import these types into Python. First you need to import the udf function from the pyspark.sql.functions package and then you need to import StringType from the pyspark.sql.types package. StringType is needed for Spark to verify that the SQL is intact with the return type of the user-defined function.
This is the easiest way to register UDFs.
This method of registration uses the spark.udf.register function. Let’s look at each of the arguments passed in this function.
The first one is genderCodeToValue. It is the name of UDF. This is the name that will be used in the Spark SQL to call this function.
The second one is genderCodeToValue. This is the method we just created that implements the required functionality. Also note that this is a reference to the method. In this case, the method is passed as a reference. It is very important at this point that any issues that will be encountered while this UDF is invoked will be exposed only when the function is applied. That is when Spark executes an action on the DataFrame. It is very important to make sure that the function is tested for all the kinds of data that you will operate on.
Also it is good practice to properly handle errors within UDFs. Many times, UDFs are tested on smaller datasets and developers are happy that the functions work perfectly. But in the typical Big Data world where the data is error prone, it is very hard to identify when the flow failed and what data caused the error. When millions of rows being processed using UDFs are stopped because of one wrong issue that was not handled properly, this is a nightmare in production environments.
The third function is StringType(). This indicates that the UDF will return a string value. Spark SQL uses this return type to properly validate and apply the UDF.
Step 6-5-3. Calling the Registered UDF from PySpark SQL
Now we are going to use the UDF genderCodeToValue , which we just created in Spark SQL. Let’s get started with it.
Note that we have cleaner output with the proper column alias names. With this, we have successfully completed the assignment of creating a custom UDF.
Recipe 6-6. Join Two DataFrames Using SQL
Problem
You want to join two DataFrames using Spark SQL. Joining two or more datasets is a very important step in any data transformation process. Often as ETL developers we end up joining multiple tables to get the desired output report. In a typical data warehouse, either the star schema or the snowflake schema data modeling technique is followed. In either of these data modeling techniques, lots of joins are involved. With Big Data, it is preferred to have denormalized data models since joining huge datasets will slow down the performance of the queries. Nevertheless, we have to clearly understand how to join datasets and get the reports.
Solution
We are going to get the marks obtained by the students in individual subjects.
For that we need to first load this data as a DataFrame. At this stage, since we have performed this multiple times, it would be a good exercise if you could do this without referring to the following code.
How It Works
Yes, it takes only these three lines of Spark code to load data from CSV and be able to query from it using SQL. It is this specialty of Spark that makes it great. Using any other language or tools, it will take quite a lot of boilerplate code to be able to start querying this data based on SQL.
Joins: Tables are joined by applying the join keyword between the tables. There are multiple kinds of joins that Spark SQL supports. The most traditional and most used are the inner join, the left outer join, and the right outer join. By default, when the join keyword is provided, Spark performs an inner join. For an inner join, Spark compares the column values of each row of the tables on both the sides of the given column in the query. Only the rows that satisfy the criteria will be selected. The left outer join selects all the rows from the left table and rows that are not matching from the right side tables will be marked as null. The right outer join selects all rows from the right and rows not matching from the left table will be marked as null.
Table alias: Previously in this chapter we looked into aliasing columns. Similarly tables can also be provided an alias name. This will be helpful when the join column name is the same in both tables. The following query, which does not use alias names, gets an error due to its ambiguous column names:
On clause: Readers who are familiar with RDBMS SQLs will want to apply the join condition on the where clause rather on the on clause. While it is syntactically correct to compare the columns on the where clause, we will not get the correct results. This is because of how Spark achieves the join. So always apply the join condition to the On clause.
Observation 1: Some of the students are missing from this report. For example, the student named William does not appear in this output.
Observation 2: There was a labeled studentId si5 that was available in the subjects dataset that is missing from this output.
Let’s explore these issues in more detail.
Observation 1
The reason we are not able to see William in the output is because William, with studentId si5, is not available in the subjects dataset. Because of this, when Spark is applying the inner join, it doesn’t find studentid si5, so it drops it from the output. If the report needs to list all the students whether they are available on the subjects dataset or not, we need to use a left join or left outer join.
At the very end of the output, note that William’s row is available but the subject is null. This will be very useful and needed for showing scenarios when a student is absent and has not taken the test.
There often are columns that are not required for output. So, let’s show only the columns that are needed. As an example, we are going to select the studentId, name, subjects, and marks columns only for reporting purposes.
Observation 2
Now let’s look into the second observation, where there is a record in the subjects dataset that did not appear in the output. This could be due to multiple reasons, such as the student’s data was not properly maintained, data entry errors occurred while entering studentId values, etc. All of these kinds of errors are very common in Big Data volumes. These errors cannot be ignored since they will introduce data errors and incorrect data reporting. For these reasons, let’s try to include that value in the report using a right outer join.
Note that the only difference between the left and right outer joins is replacing the word “left” with “right”.
Note that subject C is available in the output. But the name and studentid fields are null. That is because studentId si5 is not available in the students dataset.
Now we can show the missing subjects using a left outer join or a right outer join. But having only one subject in the output will not be useful. We need to be able to list both of the missing entities in one output. In such scenarios, we need to use a full outer join, which will match based on the join condition and will select rows from both the tables.
Note that a full outer join is a very costly operation. Applying a full outer join on Big Data volumes will consume lots of resources and time. So it is better to avoid full outer joins in time-critical SLA-based applications.
Note that William and subject C are both available. Now that we are able to see these inconsistencies, we can work with the data sources to fix them.
While applying the join, we need to identify the columns with which we can apply it. It is always better if we can have numerical columns since the comparison will be faster and less error prone to any data issues. Usually when you are working with Big Data volumes, you’ll find string columns that are not very clean. For example, there may be trailing or leading spaces, case-sensitivity issues, and so on, that will make these joins fail. So you need to first cleanse these data differences and then apply the join.
With this we have successfully completed the assignment of joining two DataFrames.
Recipe 6-7. Join Multiple DataFrames Using SQL
Problem
You want to join more than two DataFrames and see the output.
Solution
This dataset has an attendance column, which registers how many days out of 40 classes each student attended for each subject.
Now that you are aware of how to handle errors in data using left joins, right joins, and full outer joins, I have cleansed the data and simplified it.
How It Works
To be able to start applying queries, we first need to load the attendance data. Let’s go ahead and load the attendance dataset.
This query is joining the third dataset as a continuation of the existing join. Similarly, you can join any number of tables in the same manner.
It’s very hard to read from this output since it contains all the columns from all three tables.
Now the task is to get a cleaner report with only the required columns by combining all the datasets—student, subject, attendance—that will provide a report:
Name Gender Subject Marks Attendance