In this chapter, we look at various Spark SQL recipes that optimize SQL queries. Apache Spark is an open source framework that is developed with Big Data volumes in mind. It is supposed to handle huge volumes of data. It is supposed to be used in scenarios where there is a need for horizontal scaling for processing power. Before we cover the optimization techniques used in Apache Spark, you need to understand the basics of horizontal scaling and vertical scaling.
Since this is a very important concept to understand, let’s look at a simple example. Let’s assume we are travelling from place A to place B and we need to accommodate 20 people with a bus that has 20 passenger seats. All is good since we have the seat capacity to satisfy the number of passengers. Let’s say we now have to accommodate 30 passengers. We can get a bigger bus that has a seating capacity of 30, such as a double decker bus. In this scenario, we are going to keep only one bus, but it has increased capacity. So essentially we have scaled from a 20-passenger capacity bus to 30-passenger capacity bus, and we still use just one bus. This is vertical scaling, where we are able to scale up using only one bus.
The issue with vertical scaling is that there is a limit to the level of scaling we can do. For example, if we get 60 or 70 passengers, we may not be able to find just one bus able to accommodate them all. So there is a limit to how much we can scale up vertically. Also if that one bus is faced with any maintenance issues then we will not be able to travel at all. This example is similar to software vertical scaling, where there is also a limit to how much we can scale up vertically. It also has the problem of a single point of failure. If one big fat machine fails, our entire system will go down.
Horizontal scaling addresses most of these problems. Let’s look at horizontal scaling more. Let’s say we add a bus with a capacity of 20 and are now using two buses. So essentially we have scaled out by adding one more bus to our existing fleet. This kind of scaling is called horizontal scaling. While at the outset it seems to address many of the issues of vertical scaling, it comes with its own set of issues. For example, in the same bus example, we will need to make sure both buses are in sync. We will have to take extra steps in making sure all the passengers are onboard by checking in with both the buses. We might need to always synchronize the plans for both of the buses. Also we will need to have more drivers whom we need to manage. These are some of the extra steps that we need to take while adding another bus.
Very similar to this, with horizontal scaling, we need to make sure the distributed processes are synchronized. We will need to make sure the output from all the machines is consolidated into one final output.
Now that we have covered vertical scaling and horizontal scaling, let’s look at scaling from a perspective of Big Data. Since Big Data is supposed to handle huge volumes of data, vertical scaling is not really an option. We have to go with horizontal scaling. Since there is going to terabytes and petabytes of data that needs to be processed, we will need to keep adding more machines to do the processing. We also need software that works in a distributed environment. Regular windows filesystems, such as FAT and NTFS, and UNIX filesystems such as EXT cannot work in a distributed set of machines.
HDFS is one such filesystem that can operate on distributed sets of machines. HDFS can be started with a five machine/node setup. With more and more data, it can be easily expanded to a distributed multi-node system. Other than HDFS, Amazon S3 is another example of a distributed filesystem. While such distributed filesystems know how to store the data in a distributed manner, we still need to apply the compute in a distributed manner.
While Hadoop’s MapReduce framework is a good fit for distributed compute, it has lots of boilerplate code that needs to be written. It also involves a lot of disk I/O, which drastically impacts performance. Users need to understand the complex framework and it is very difficult for many users. That is how Apache Spark saves the day.
Apache Spark can easily handle such distributed compute. It provides an easy-to-use interface for users. It removes most of the boilerplate code required by MapReduce and reduces the 50 lines of code in the MapReduce framework to 5-10 lines of code.
Apache Spark also introduced in-memory processing, which tends to drastically speed up the slowly performing disk I/O. Operations that are very repetitive in nature can cache the data in memory and can be used multiple times. All of those together make Apache Spark the one-stop shop for Big Data processing.
Even though Apache Spark natively supports processing huge volumes of data in a distributed manner, we need to provide spark with the right environment in terms of properly distributing the data. For example, if in a 10-node cluster all the data is located in just three nodes and the other seven nodes are with empty data or very little data, as you can see, only the three nodes are going to do all the work. The remaining nodes will be silent. This will result in performance reduction. You need to supply Spark programs with the right distribution of data or you need to provide the right tuning parameters that will accordingly distribute the data and improve the performance.
Similarly, if there are joins on huge tables with a join key that’s skewed or not distributed properly, there will be a lot of shuffle. In such scenarios, Spark will get into out of memory situations. We then need to use the right performance improvement parameters.
These are just some of the scenarios in which Spark needs you to optimize the queries so that you get the best performance from the Spark SQL.
Now that we have covered the context around performance optimization, let’s go through the recipes of this chapter step-by-step and learn the PySpark optimization techniques.
Recipe 7-1. Apply aggregation using PySpark SQL
Recipe 7-2. Apply Windows functions using PySpark SQL
Recipe 7-3. Cache data using PySpark SQL
Recipe 7-4. Apply the Distribute By, Sort By, and Cluster By clauses in PySpark SQL
Let’s get into the first recipe.
Recipe 7-1. Apply Aggregation Using PySpark SQL
Problem
You want to use Group By in PYSpark SQL to find the average marks per student. So far you have created the students report and shown their respective subjects and marks from the students and subjects data. Let’s try to get a report on students to identify the average marks for each of them.
Solution
You might remember this output from the previous chapter. Let’s try to use Group By on this query to identify the average marks per student.
How It Works
Note that we haven’t used the Group By clause in this query. If we use an aggregate function without applying a Group By clause, we will get an error. While the UDF operates on one row per call, avg is actually what Spark calls User Defined Aggregate Functions (UDAF). These kinds of functions take a set of rows based on the Group By clause and apply the functionality on those sets of rows. Let’s see the Group By in action.
Note from this query that group by name applies Group By on the name column. The select clause only selects name and avg(marks). If you want to determine how to select the columns on which you want to apply the group column, remember that the columns that come in the Per section of the report (for example per student or per subject) will be the columns that participate in the Group By clause. You can also have multiple columns or expressions in the Group By clause. For example, if we want to find avg marks per student on a subject, we need to include both the name and the subject column in the Group By clause.
We are using both name and subject in the Group By. This will bring all the records that are unique for students and subjects. If the data contains marks obtained by students per subjects for all the semesters, this will provide us with the overall average marks of the students per subject.
Step 7-1-1. Finding the Number of Students per Subject
In this step, we are going to find the number of students who have taken each test. Let’s write the query to get that report.
Step 7-1-2. Finding the Number of Subjects per Student
Similarly, we are going to determine how many subjects on which each student has taken a test.
These queries show the process of using Group By clauses to apply aggregate functions. These aggregate functions come in handy when we are preparing data for any report. In everyday business settings, these are reports you will be repeatedly using. Especially when it comes to data analytics, where you are going to get insights from data, you will need to slice and dice the data into various aspects to get meaningful information from it. So it is very important to be able to appreciate the use of the Group By clause in Spark SQLs.
Other UDAFs (User Defined Aggregate Functions) available in PySpark SQL are sum(),count(), countDistinct(), max(), min(), etc.
It is important to note that keeping these UDFs and UDAFs lean is very important from a performance optimization perspective. Since these UDFs and UDAFs would be run in JVMs, the data needs to be serialized before passing it to the JVM. So there could be significant performance impact when these functions are incorrectly used. Similarly, since these functions are applied on millions of rows in the Big Data space, avoiding log and print statements are good practices. For example, a UDF that takes just a few seconds could pose serious performance issues when it’s run on a data volume of 10 billion rows.
Recipe 7-2. Apply Windows Functions Using PySpark SQL
Problem
You want to find the student scoring first and second in each of the subjects.
Solution
Apache Spark supports a special type of function, called windows functions, along with its built-in UDFs such as substr, round, etc. and aggregate functions such as sum and avg. While these UDFs and aggregate functions can perform most operations, there still are many operations that cannot be performed by these functions. So Spark introduces windows functions, which can operate on a set of rows while operating on a single row.
How It Works
Step 7-2-1. Applying the Rank Function to Get the Rank
This step uses the rank windows function. The rank function provides a sequential number for each row within a selected set of rows. Initially I am going to apply the rank function on all the rows without any window selection. Let’s see how to do that.
From this query you can see the rank() function being used like any other user-defined function. Note that there is a over clause immediately after the rank() function. This over clause defines the window of rows for the rank function. This window needs to identify the set of rows and the ordering of those rows.
In this query, we are not using a set of rows. We are using all the rows in the table as one set. Next we use the ORDER BY name. This is used to order the rows using the name column. Based on the order you are going to see, a rank is given to each of the rows. The query in this form may not be very helpful. But it’s here for simplicity reasons and for you to understand. Let’s look at it step by step.
Step 7-2-2. Using Partition By in a Rank Function
What is the average age of the students in their participating subjects?
What is the percentile of marks obtained by the students in their participating subjects?
In this query we added a new PARTITION clause on the subject. This is to say that we are windowing based on the subject column. This will create a separate window for each distinct subject column value and will apply the window function rank only within the rows of the window set.
But wait, note that we are not getting the correct rank. This is because the Order By column uses name rather than marks. Let go ahead and correct this to use the marks column for ordering the students in each of the subjects.
Step 7-2-3. Obtaining the Top Two Students for Each Subject
Now that we have the ranking in place, let’s identify the first two students only in the report. This is very easy. We need to filter on the rank column for values less than or equal to two.
Here is the output. As we can see from this query, a subquery alias uses the RANK column in the filter. This is because, as we saw in the previous chapter, an alias column created in a query cannot be used in the same query. So we need to make it a subquery and apply the filter from the parent query.
We have successfully used the windows rank() function and covered the concept of windowing functions. Using this concept, we can use any windows function.
Note that when we are applying these window functions on huge volumes of data in a Big Data world, it’s going to add lots of performance constraints. Especially when we are joining big tables, there will be a drastic performance degradation. So it is always a good practice to use these windows functions as pre-computed values and then use those values in the joins. We have to make sure we are using the PARTITION clause in the right manner. We are going to cover how to use PARTITION in the next recipe.
Recipe 7-3. Cache Data Using PySpark SQL
Problem
You want to cache a table or dataset.
Solution
Many times when there is a dataset that we have to use repeatedly, we don’t want to read it every time we use it. For example, data containing country and country code will be smaller in size. This data could fit in the memory very easily without occupying a lot of space. But we wouldn’t want such data to be read each time it is used. So for such scenarios, we can use the cache provided by Spark using PySpark SQL.
When it comes to Machine Learning algorithms that involve applying logic over the same dataset multiple times, caching is the best option to get the best performance. With caching Spark will keep the data in memory so it needs to read it again. Reading the data from disk is a costly operation. For all its advantages, let’s try to use caching in the example.
How It Works
In this example, we are going to first cache the students data and create a cached view. After that, we apply a join that cached data with the subject and assignments.
Let’s try to use the cached data in queries.
While caching is very helpful, we need to use caching very carefully. We don’t want to cache huge volumes of data. It will lead to out of memory exceptions. Handling huge volumes of data in memory will leave Spark with less memory for other operations. We could get an out of memory error while performing a totally different operation, and it would actually be due to caching huge volumes data in memory.
Recipe 7-4. Apply the Distribute By, Sort By, and Cluster By Clauses in PySpark SQL
Problem
You want to use Distribute By in PySpark SQL to effectively partition your data.
Solution
As explained at the start of this chapter , the key to getting highly effective data transformation with respect to high performance and optimum use of resources in a distributed computing environment solely depends on how effectively the data is distributed. If the data is equally distributed among all the data nodes, the Spark job will perform in an equally distributed manner. If not, we end up with some of the tasks taking the maximum time and resources while others finishing in a few seconds.
Most of the data is placed in one of the nodes, due to which the default partition provided by Spark is not effective.
The key based on which you are joining the data is skewed because the join operation is happening in a few allocated executors.
In both of these scenarios, the data is not partitioned correctly. Spark provides mechanisms for users to determine how to repartition data from Spark’s original partition. While it is called repartition in the Spark programmatic API, it is called distribute By PySpark SQL. This technique is one of the best ways to optimize Spark SQLs, yet it is not widely used because not many are aware of these clauses.
Let’s see how to use Distribute By in PySpark SQL
How It Works
This example applies the Distribute By clause on studentId since it’s the join key.
In the following queries, we create two new DataFrames called student_distributed and subjects_distributed. If you look at the query, we are applying the Distribute By clause on the studentId column .
After creating those DataFrames, we also register it as a temporary view. Up until now, there has been no action performed by Spark. When there is a call for an action further down, Spark will repartition the data based on studentId, so that the data will be evenly distributed among all the compute nodes.
There will not be any output from these queries since we are not calling the show() method and there are no actions being performed with this.
Once we have distributed views, we can start using these views in the query. Let’s join the students and subjects datasets based on the newly created distributed views .
You can see from the Figure 7-6 that there is a lot of data moving around between the nodes. This is called shuffling across the nodes and reducing such shuffling will help us improve the performance of the jobs .
In Figure 7-7, note that even though you have efficiently parallelized the join, there is still a lot of shuffling. Note that from the subjects that the data is going everywhere. Such data shuffling leads to performance degradation and out of memory errors when sufficient memory is not available.
The way to minimize data shuffling is by sorting the data in subjects and then joining it with students .
In the following query, we are applying both the Distribute By and Sort By clauses to achieve maximum performance .
In Figure 7-8, note that the number of data shuffles has been reduced and the flow of data is very smooth. There isn’t much shuffling happening compared to Figure 7-7.
Now that you have used the Distribute By and Sort By clauses together, Spark By provides an optional clause to combine the data—it’s called Cluster By.
Using Cluster By is the same as using Distribute By and Sort By together on the same columns. Let’s see Cluster By in action.
In this recipe, you have successfully used the Distribute By, Sort By, and Cluster By clauses to effectively optimize your PySpark SQL queries .