© Raju Kumar Mishra and Sundar Rajan Raman 2019
Raju Kumar Mishra and Sundar Rajan RamanPySpark SQL Recipeshttps://doi.org/10.1007/978-1-4842-4335-0_7

7. Optimizing PySpark SQL

Raju Kumar Mishra1  and Sundar Rajan Raman2
(1)
Bangalore, Karnataka, India
(2)
Chennai, Tamil Nadu, India
 

In this chapter, we look at various Spark SQL recipes that optimize SQL queries. Apache Spark is an open source framework that is developed with Big Data volumes in mind. It is supposed to handle huge volumes of data. It is supposed to be used in scenarios where there is a need for horizontal scaling for processing power. Before we cover the optimization techniques used in Apache Spark, you need to understand the basics of horizontal scaling and vertical scaling.

The term scaling refers to when there is a need to increase the compute power or the memory capacity of the system. Vertical scaling can be defined as adding more processing capacity or power to an existing machine. For example, when we have a machine that has 8GB RAM and are processing 2GB of data. If there is an increase in the data volume from 2GB to 4GB, we might want to extend the RAM from 8GB to 16GB. For this, we add an extra capacity of 8GB RAM. This will make the machine have 16GB RAM in total. This mechanism of improving the performance by extending the resources of the existing machine is called vertical scaling (see Figure 7-1).
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig1_HTML.png
Figure 7-1

Vertical scaling

On the other hand, in horizontal scaling, we add a machine to the existing machine and make it two machines to improve the capacity. Taking the same example, we add another machine with a capacity of 8GB and make two machines processing the 4GB data, which is called horizontal scaling (see Figure 7-2).
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig2_HTML.png
Figure 7-2

Horizontal scaling

Since this is a very important concept to understand, let’s look at a simple example. Let’s assume we are travelling from place A to place B and we need to accommodate 20 people with a bus that has 20 passenger seats. All is good since we have the seat capacity to satisfy the number of passengers. Let’s say we now have to accommodate 30 passengers. We can get a bigger bus that has a seating capacity of 30, such as a double decker bus. In this scenario, we are going to keep only one bus, but it has increased capacity. So essentially we have scaled from a 20-passenger capacity bus to 30-passenger capacity bus, and we still use just one bus. This is vertical scaling, where we are able to scale up using only one bus.

The issue with vertical scaling is that there is a limit to the level of scaling we can do. For example, if we get 60 or 70 passengers, we may not be able to find just one bus able to accommodate them all. So there is a limit to how much we can scale up vertically. Also if that one bus is faced with any maintenance issues then we will not be able to travel at all. This example is similar to software vertical scaling, where there is also a limit to how much we can scale up vertically. It also has the problem of a single point of failure. If one big fat machine fails, our entire system will go down.

Horizontal scaling addresses most of these problems. Let’s look at horizontal scaling more. Let’s say we add a bus with a capacity of 20 and are now using two buses. So essentially we have scaled out by adding one more bus to our existing fleet. This kind of scaling is called horizontal scaling. While at the outset it seems to address many of the issues of vertical scaling, it comes with its own set of issues. For example, in the same bus example, we will need to make sure both buses are in sync. We will have to take extra steps in making sure all the passengers are onboard by checking in with both the buses. We might need to always synchronize the plans for both of the buses. Also we will need to have more drivers whom we need to manage. These are some of the extra steps that we need to take while adding another bus.

Very similar to this, with horizontal scaling, we need to make sure the distributed processes are synchronized. We will need to make sure the output from all the machines is consolidated into one final output.

Now that we have covered vertical scaling and horizontal scaling, let’s look at scaling from a perspective of Big Data. Since Big Data is supposed to handle huge volumes of data, vertical scaling is not really an option. We have to go with horizontal scaling. Since there is going to terabytes and petabytes of data that needs to be processed, we will need to keep adding more machines to do the processing. We also need software that works in a distributed environment. Regular windows filesystems, such as FAT and NTFS, and UNIX filesystems such as EXT cannot work in a distributed set of machines.

HDFS is one such filesystem that can operate on distributed sets of machines. HDFS can be started with a five machine/node setup. With more and more data, it can be easily expanded to a distributed multi-node system. Other than HDFS, Amazon S3 is another example of a distributed filesystem. While such distributed filesystems know how to store the data in a distributed manner, we still need to apply the compute in a distributed manner.

For example, let’s assume we need to find the average of all students available in a file that is distributed across two machines. The compute process should find the average from System-1 and from System-2 and finally it should know how to find the average from these averages as well. This makes the compute process more complicated in a distributed setup. See Figure 7-3.
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig3_HTML.png
Figure 7-3

A distributed setup

Even for simple operations, you need sophisticated software that can handle the distributed nature of compute. Hadoop includes both an HDFS and a MapReduce framework that can handle distributed computations from multiple nodes. The MapReduce framework contains a Map phase that tries to apply the compute on all the machines and get the intermediate output. The Reduce phase works on combining the intermediate output into one single output. In the previous example if the average is to be found using the MapReduce framework, the Map phase can find the average of students from each of the files, while the Reduce phase finds the final output which computes an average of averages. Figure 7-4 shows the MapReduce framework at a high level.
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig4_HTML.png
Figure 7-4

Average using the MapReduce framework and assuming the same number of students in each node

While Hadoop’s MapReduce framework is a good fit for distributed compute, it has lots of boilerplate code that needs to be written. It also involves a lot of disk I/O, which drastically impacts performance. Users need to understand the complex framework and it is very difficult for many users. That is how Apache Spark saves the day.

Apache Spark can easily handle such distributed compute. It provides an easy-to-use interface for users. It removes most of the boilerplate code required by MapReduce and reduces the 50 lines of code in the MapReduce framework to 5-10 lines of code.

Apache Spark also introduced in-memory processing, which tends to drastically speed up the slowly performing disk I/O. Operations that are very repetitive in nature can cache the data in memory and can be used multiple times. All of those together make Apache Spark the one-stop shop for Big Data processing.

Even though Apache Spark natively supports processing huge volumes of data in a distributed manner, we need to provide spark with the right environment in terms of properly distributing the data. For example, if in a 10-node cluster all the data is located in just three nodes and the other seven nodes are with empty data or very little data, as you can see, only the three nodes are going to do all the work. The remaining nodes will be silent. This will result in performance reduction. You need to supply Spark programs with the right distribution of data or you need to provide the right tuning parameters that will accordingly distribute the data and improve the performance.

Similarly, if there are joins on huge tables with a join key that’s skewed or not distributed properly, there will be a lot of shuffle. In such scenarios, Spark will get into out of memory situations. We then need to use the right performance improvement parameters.

These are just some of the scenarios in which Spark needs you to optimize the queries so that you get the best performance from the Spark SQL.

Now that we have covered the context around performance optimization, let’s go through the recipes of this chapter step-by-step and learn the PySpark optimization techniques.

We will go through the following recipes:
  • Recipe 7-1. Apply aggregation using PySpark SQL

  • Recipe 7-2. Apply Windows functions using PySpark SQL

  • Recipe 7-3. Cache data using PySpark SQL

  • Recipe 7-4. Apply the Distribute By, Sort By, and Cluster By clauses in PySpark SQL

Let’s get into the first recipe.

Recipe 7-1. Apply Aggregation Using PySpark SQL

Problem

You want to use Group By in PYSpark SQL to find the average marks per student. So far you have created the students report and shown their respective subjects and marks from the students and subjects data. Let’s try to get a report on students to identify the average marks for each of them.

Solution

We load the Students and Subjects datasets and join them together:
>>> #Load and register Student data
>>>studentDf = spark.read.csv('studentData_dateofbirth.csv',header=True, inferSchema=True)
>>> studentDf.createOrReplaceTempView("StudentsDob")
>>> #Load and register subjects
>>>subjectsDf = spark.read.csv('subjects.csv',header=True, inferSchema=True)
>>> subjectsDf.createOrReplaceTempView("subjects")
>>> spark.sql("select * from studentsdob st join subjects sb on (st.studentId = sb.studentId)").show()
Here is the output:
+---------+-------+------+-------------------+---------+-------+-----+
|studentId|   name|gender|        dateofbirth|studentId|subject|marks|
+---------+-------+------+-------------------+---------+-------+-----+
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1|   Java|   81|
|      si1|  Robin|     M|1981-12-13 00:00:00|      si1| Python|   75|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2|   Java|   83|
|      si2|  Maria|     F|1986-06-06 00:00:00|      si2| Python|   85|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Ruby|   72|
|      si3|  Julie|     F|1988-09-05 00:00:00|      si3|   Java|   76|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4| Python|   84|
|      si4|    Bob|     M|1987-05-04 00:00:00|      si4|    C++|   78|
|      si6|William|     M|1980-11-12 00:00:00|      si6|   Java|   70|
+---------+-------+------+-------------------+---------+-------+-----+

You might remember this output from the previous chapter. Let’s try to use Group By on this query to identify the average marks per student.

How It Works

Similar to the issue with UDFs, if we use the avg function as follows, it will not work.
>>> spark.sql("select name, avg(marks) from studentsdob st join subjects sb on (st.studentId = sb.studentId)").show()
Here is the error that we will get:
py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'st.`name`' is not an aggregate function. Wrap '(avg(CAST(sb.`marks` AS BIGINT)) AS `avg(marks)`)' in windowing function(s) or wrap 'st.`name`' in first() (or first_value) if you don't care which value you get.;

Note that we haven’t used the Group By clause in this query. If we use an aggregate function without applying a Group By clause, we will get an error. While the UDF operates on one row per call, avg is actually what Spark calls User Defined Aggregate Functions (UDAF). These kinds of functions take a set of rows based on the Group By clause and apply the functionality on those sets of rows. Let’s see the Group By in action.

Here is the query that uses Group By:
>>> spark.sql("select name,avg(marks) from studentsdob st left join subjects sb on (st.studentId = sb.studentId) group by name").show()

Note from this query that group by name applies Group By on the name column. The select clause only selects name and avg(marks). If you want to determine how to select the columns on which you want to apply the group column, remember that the columns that come in the Per section of the report (for example per student or per subject) will be the columns that participate in the Group By clause. You can also have multiple columns or expressions in the Group By clause. For example, if we want to find avg marks per student on a subject, we need to include both the name and the subject column in the Group By clause.

Here is the output of the query:
+-------+----------+
|   name|avg(marks)|
+-------+----------+
|  Robin|      78.0|
|    Bob|      81.0|
|  Julie|      74.0|
|  Maria|      84.0|
|William|      70.0|
+-------+----------+
The following query uses two columns in the Group By clause:
>>> spark.sql("select name,subject,avg(marks) from studentsdob st left join subjects sb on (st.studentId = sb.studentId) group by name,subject").show()

We are using both name and subject in the Group By. This will bring all the records that are unique for students and subjects. If the data contains marks obtained by students per subjects for all the semesters, this will provide us with the overall average marks of the students per subject.

Here is the output:
+-------+-------+----------+
|   name|subject|avg(marks)|
+-------+-------+----------+
|William|   Java|      70.0|
|    Bob| Python|      84.0|
|  Robin| Python|      75.0|
|  Robin|   Java|      81.0|
|  Maria| Python|      85.0|
|  Maria|   Java|      83.0|
|    Bob|    C++|      78.0|
|  Julie|   Ruby|      72.0|
|  Julie|   Java|      76.0|
+-------+-------+----------+

Step 7-1-1. Finding the Number of Students per Subject

In this step, we are going to find the number of students who have taken each test. Let’s write the query to get that report.

Here is the query:
>>> spark.sql("select subject,count(name) Students_count from studentsdob st left join subjects sb on (st.studentId = sb.studentId) group by subject").show()
Here is the output:
+-------+--------------+
|subject|Students_count|
+-------+--------------+
|    C++|             1|
|   Ruby|             1|
| Python|             3|
|   Java|             4|
+-------+--------------+

Step 7-1-2. Finding the Number of Subjects per Student

Similarly, we are going to determine how many subjects on which each student has taken a test.

Here is the query:
>>> spark.sql("select name,count(subject) Students_count from studentsdob st left join subjects sb on (st.studentId = sb.studentId) group by name").show()
Here is the output:
+-------+--------------+
|   name|Students_count|
+-------+--------------+
|  Robin|             2|
|    Bob|             2|
|  Julie|             2|
|  Maria|             2|
|William|             1|
+-------+--------------+

These queries show the process of using Group By clauses to apply aggregate functions. These aggregate functions come in handy when we are preparing data for any report. In everyday business settings, these are reports you will be repeatedly using. Especially when it comes to data analytics, where you are going to get insights from data, you will need to slice and dice the data into various aspects to get meaningful information from it. So it is very important to be able to appreciate the use of the Group By clause in Spark SQLs.

Other UDAFs (User Defined Aggregate Functions) available in PySpark SQL are sum(),count(), countDistinct(), max(), min(), etc.

It is important to note that keeping these UDFs and UDAFs lean is very important from a performance optimization perspective. Since these UDFs and UDAFs would be run in JVMs, the data needs to be serialized before passing it to the JVM. So there could be significant performance impact when these functions are incorrectly used. Similarly, since these functions are applied on millions of rows in the Big Data space, avoiding log and print statements are good practices. For example, a UDF that takes just a few seconds could pose serious performance issues when it’s run on a data volume of 10 billion rows.

Recipe 7-2. Apply Windows Functions Using PySpark SQL

Problem

You want to find the student scoring first and second in each of the subjects.

Solution

Apache Spark supports a special type of function, called windows functions, along with its built-in UDFs such as substr, round, etc. and aggregate functions such as sum and avg. While these UDFs and aggregate functions can perform most operations, there still are many operations that cannot be performed by these functions. So Spark introduces windows functions, which can operate on a set of rows while operating on a single row.

How It Works

Step 7-2-1. Applying the Rank Function to Get the Rank

This step uses the rank windows function. The rank function provides a sequential number for each row within a selected set of rows. Initially I am going to apply the rank function on all the rows without any window selection. Let’s see how to do that.

Here is the query that uses rank in its simplest form:
>>> spark.sql("select name,subject,marks,rank() over (order by name) as RANK from studentsdob st join subjects sb ON (st.studentId = sb.studentId)").show()

From this query you can see the rank() function being used like any other user-defined function. Note that there is a over clause immediately after the rank() function. This over clause defines the window of rows for the rank function. This window needs to identify the set of rows and the ordering of those rows.

In this query, we are not using a set of rows. We are using all the rows in the table as one set. Next we use the ORDER BY name. This is used to order the rows using the name column. Based on the order you are going to see, a rank is given to each of the rows. The query in this form may not be very helpful. But it’s here for simplicity reasons and for you to understand. Let’s look at it step by step.

Here is the output:
+-------+-------+-----+----+
|   name|subject|marks|RANK|
+-------+-------+-----+----+
|    Bob|    C++|   78|   1|
|    Bob| Python|   84|   1|
|  Julie|   Java|   76|   3|
|  Julie|   Ruby|   72|   3|
|  Maria| Python|   85|   5|
|  Maria|   Java|   83|   5|
|  Robin| Python|   75|   7|
|  Robin|   Java|   81|   7|
|William|   Java|   70|   9|
+-------+-------+-----+----+

Step 7-2-2. Using Partition By in a Rank Function

In the previous step we learned to apply the rank function . Now we will learn to use the PARTITION clause to classify the rows as multiple sets. You can visualize this as keeping a window over those set of rows, as shown in Figure 7-5.
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig5_HTML.jpg
Figure 7-5

Classifying rows as multiple sets

As shown in Figure 7-5, there are four windows based on the subject. Each windows has the same subject. The C++, Java, Python, and Ruby subjects are created. Now let’s assume for each row, there are specific actions that need to be performed based on the rows. Then we can apply the window functions. Examples of such functions are:
  • What is the average age of the students in their participating subjects?

  • What is the percentile of marks obtained by the students in their participating subjects?

It is very hard to write such functions in pure SQL without window functions. Let’s go ahead and create the windows using the PARTITION clause .
>>> spark.sql("select name,subject,marks,rank() over (PARTITION BY subject order by name) as RANK from studentsdob st join subjects sb ON (st.studentId = sb.studentId)").show()

In this query we added a new PARTITION clause on the subject. This is to say that we are windowing based on the subject column. This will create a separate window for each distinct subject column value and will apply the window function rank only within the rows of the window set.

Here is the output:
From this output you can see that rank has been assigned to each of the subjects to students.
+-------+-------+-----+----+
|   name|subject|marks|RANK|
+-------+-------+-----+----+
|    Bob|    C++|   78|   1|
|  Julie|   Ruby|   72|   1|
|    Bob| Python|   84|   1|
|  Maria| Python|   85|   2|
|  Robin| Python|   75|   3|
|  Julie|   Java|   76|   1|
|  Maria|   Java|   83|   2|
|  Robin|   Java|   81|   3|
|William|   Java|   70|   4|
+-------+-------+-----+----+

But wait, note that we are not getting the correct rank. This is because the Order By column uses name rather than marks. Let go ahead and correct this to use the marks column for ordering the students in each of the subjects.

Here is the query:
>>> spark.sql("select name,subject,marks,rank() over (PARTITION BY subject order by marks desc) as RANK from studentsdob st join subjects sb ON (st.studentId = sb.studentId)").show()
Note that I have changed the ordering based on marks. Also I am making sure the data is in ascending order. This is to make sure the students with the highest mark get the first rank.
+-------+-------+-----+----+
|   name|subject|marks|RANK|
+-------+-------+-----+----+
|    Bob|    C++|   78|   1|
|  Julie|   Ruby|   72|   1|
|  Maria| Python|   85|   1|
|    Bob| Python|   84|   2|
|  Robin| Python|   75|   3|
|  Maria|   Java|   83|   1|
|  Robin|   Java|   81|   2|
|  Julie|   Java|   76|   3|
|William|   Java|   70|   4|
+-------+-------+-----+----+

Step 7-2-3. Obtaining the Top Two Students for Each Subject

Now that we have the ranking in place, let’s identify the first two students only in the report. This is very easy. We need to filter on the rank column for values less than or equal to two.

Here is the query:
 >>> spark.sql("select name,subject,marks,RANK from (select name,subject,marks,rank() over (PARTITION BY subject order by marks desc) as RANK from studentsdob st join subjects sb ON (st.studentId = sb.studentId)) where RANK <= 2").show()

Here is the output. As we can see from this query, a subquery alias uses the RANK column in the filter. This is because, as we saw in the previous chapter, an alias column created in a query cannot be used in the same query. So we need to make it a subquery and apply the filter from the parent query.

Here is the output:
+-----+-------+-----+----+
| name|subject|marks|RANK|
+-----+-------+-----+----+
|  Bob|    C++|   78|   1|
|Julie|   Ruby|   72|   1|
|Maria| Python|   85|   1|
|  Bob| Python|   84|   2|
|Maria|   Java|   83|   1|
|Robin|   Java|   81|   2|
+-----+-------+-----+----+

We have successfully used the windows rank() function and covered the concept of windowing functions. Using this concept, we can use any windows function.

Note that when we are applying these window functions on huge volumes of data in a Big Data world, it’s going to add lots of performance constraints. Especially when we are joining big tables, there will be a drastic performance degradation. So it is always a good practice to use these windows functions as pre-computed values and then use those values in the joins. We have to make sure we are using the PARTITION clause in the right manner. We are going to cover how to use PARTITION in the next recipe.

Recipe 7-3. Cache Data Using PySpark SQL

Problem

You want to cache a table or dataset.

Solution

Many times when there is a dataset that we have to use repeatedly, we don’t want to read it every time we use it. For example, data containing country and country code will be smaller in size. This data could fit in the memory very easily without occupying a lot of space. But we wouldn’t want such data to be read each time it is used. So for such scenarios, we can use the cache provided by Spark using PySpark SQL.

When it comes to Machine Learning algorithms that involve applying logic over the same dataset multiple times, caching is the best option to get the best performance. With caching Spark will keep the data in memory so it needs to read it again. Reading the data from disk is a costly operation. For all its advantages, let’s try to use caching in the example.

How It Works

In this example, we are going to first cache the students data and create a cached view. After that, we apply a join that cached data with the subject and assignments.

Here is the query to cache the data. Here we will provide a name to the cached data very similar to a view or a table.
>>> spark.sql("cache table students_cached AS select * from studentsdob").show()

Let’s try to use the cached data in queries.

Here is the query that uses the newly created cached table called students_cached:
>>> spark.sql("select st.studentid,name,subject,marks from students_cached st left join subjects sb on (st.studentId = sb.studentId)").show()
As you can see from this query, there is no difference in using a cached table from using a regular table, except that we have obtained a lot of performance benefits.
+---------+-------+-------+-----+
|studentid|   name|subject|marks|
+---------+-------+-------+-----+
|      si1|  Robin|   Java|   81|
|      si1|  Robin| Python|   75|
|      si2|  Maria|   Java|   83|
|      si2|  Maria| Python|   85|
|      si3|  Julie|   Ruby|   72|
|      si3|  Julie|   Java|   76|
|      si4|    Bob| Python|   84|
|      si4|    Bob|    C++|   78|
|      si6|William|   Java|   70|
+---------+-------+-------+-----+

While caching is very helpful, we need to use caching very carefully. We don’t want to cache huge volumes of data. It will lead to out of memory exceptions. Handling huge volumes of data in memory will leave Spark with less memory for other operations. We could get an out of memory error while performing a totally different operation, and it would actually be due to caching huge volumes data in memory.

Recipe 7-4. Apply the Distribute By, Sort By, and Cluster By Clauses in PySpark SQL

Problem

You want to use Distribute By in PySpark SQL to effectively partition your data.

Solution

As explained at the start of this chapter , the key to getting highly effective data transformation with respect to high performance and optimum use of resources in a distributed computing environment solely depends on how effectively the data is distributed. If the data is equally distributed among all the data nodes, the Spark job will perform in an equally distributed manner. If not, we end up with some of the tasks taking the maximum time and resources while others finishing in a few seconds.

There are many occasions when you will provide maximum resources to your job in terms of the number of executors and executor memory. But you will still end up waiting for the job to complete with just a couple of executors running for hours while the rest have long finished. Let’s look at the reasons and see how to avoid such behaviors. There are multiple reasons why jobs behave this way:
  • Most of the data is placed in one of the nodes, due to which the default partition provided by Spark is not effective.

  • The key based on which you are joining the data is skewed because the join operation is happening in a few allocated executors.

In both of these scenarios, the data is not partitioned correctly. Spark provides mechanisms for users to determine how to repartition data from Spark’s original partition. While it is called repartition in the Spark programmatic API, it is called distribute By PySpark SQL. This technique is one of the best ways to optimize Spark SQLs, yet it is not widely used because not many are aware of these clauses.

Let’s see how to use Distribute By in PySpark SQL

How It Works

This example applies the Distribute By clause on studentId since it’s the join key.

In the following queries, we create two new DataFrames called student_distributed and subjects_distributed. If you look at the query, we are applying the Distribute By clause on the studentId column .

After creating those DataFrames, we also register it as a temporary view. Up until now, there has been no action performed by Spark. When there is a call for an action further down, Spark will repartition the data based on studentId, so that the data will be evenly distributed among all the compute nodes.

Here are the queries:
>>> student_distributed = spark.sql("select * from StudentsDob distribute by studentId")
>>> subjects_distributed = spark.sql("select * from subjects distribute by studentId")
>>> student_distributed.createOrReplaceTempView("student_distributed")
>>> subjects_distributed.createOrReplaceTempView("subjects_distributed")

There will not be any output from these queries since we are not calling the show() method and there are no actions being performed with this.

Once we have distributed views, we can start using these views in the query. Let’s join the students and subjects datasets based on the newly created distributed views .

Here is the query:
>>> spark.sql("select st.studentid,name,subject,marks from student_distributed st left join subjects_distributed sb on (st.studentId = sb.studentId)").show()
Here is the output:
+---------+-------+-------+-----+
|studentid|   name|subject|marks|
+---------+-------+-------+-----+
|      si2|  Maria|   Java|   83|
|      si2|  Maria| Python|   85|
|      si4|    Bob| Python|   84|
|      si4|    Bob|    C++|   78|
|      si3|  Julie|   Ruby|   72|
|      si3|  Julie|   Java|   76|
|      si6|William|   Java|   70|
|      si1|  Robin|   Java|   81|
|      si1|  Robin| Python|   75|
+---------+-------+-------+-----+
Understanding the use of Distribute By will help us effectively parallelize the jobs with evenly distributed data. Now assume we are joining datasets based on a join key and the key is unordered in both datasets. In this case, Spark has to shuffle the data between the nodes to get the rows from both tables with the same key. See Figure 7-6 .
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig6_HTML.png
Figure 7-6

Shuffling in Spark during joins

You can see from the Figure 7-6 that there is a lot of data moving around between the nodes. This is called shuffling across the nodes and reducing such shuffling will help us improve the performance of the jobs .

We just saw that Distribute By is one such option. Let’s try to visualize Distribute By. Assuming there are two partitions, we would get Figure 7-7.
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig7_HTML.png
Figure 7-7

Using Distribute By

In Figure 7-7, note that even though you have efficiently parallelized the join, there is still a lot of shuffling. Note that from the subjects that the data is going everywhere. Such data shuffling leads to performance degradation and out of memory errors when sufficient memory is not available.

The way to minimize data shuffling is by sorting the data in subjects and then joining it with students .

In the following query, we are applying both the Distribute By and Sort By clauses to achieve maximum performance .

Here are the queries where we are preparing the distributed and sorted students and subjects data.
>>> student_distributed_sorted = spark.sql("select * from StudentsDob distribute by studentId  SORT BY studentId")
>>> subjects_distributed_sorted = spark.sql("select * from subjects distribute by studentId SORT BY studentId")
>>> student_distributed_sorted.createOrReplaceTempView("student_distributed_sorted")
>>> subjects_distributed_sorted.createOrReplaceTempView("subjects_distributed_sorted")
Let’s look at the joined data output:
>>> spark.sql("select st.studentid,name,subject,marks from student_distributed_sorted st left join subjects_distributed_sorted sb on (st.studentId = sb.studentId)").show()
Let’s try to visualize the join. See Figure 7-8.
../images/469054_1_En_7_Chapter/469054_1_En_7_Fig8_HTML.png
Figure 7-8

Visualizing the join

In Figure 7-8, note that the number of data shuffles has been reduced and the flow of data is very smooth. There isn’t much shuffling happening compared to Figure 7-7.

The output looks very similar except that you have optimized the query in a better way :
+---------+-------+-------+-----+
|studentid|   name|subject|marks|
+---------+-------+-------+-----+
|      si2|  Maria|   Java|   83|
|      si2|  Maria| Python|   85|
|      si4|    Bob| Python|   84|
|      si4|    Bob|    C++|   78|
|      si3|  Julie|   Ruby|   72|
|      si3|  Julie|   Java|   76|
|      si6|William|   Java|   70|
|      si1|  Robin|   Java|   81|
|      si1|  Robin| Python|   75|
+---------+-------+-------+-----+

Now that you have used the Distribute By and Sort By clauses together, Spark By provides an optional clause to combine the data—it’s called Cluster By.

Using Cluster By is the same as using Distribute By and Sort By together on the same columns. Let’s see Cluster By in action.

Here are the queries using the Cluster By clause :
>>> student_distributed_clustered = spark.sql("select * from StudentsDob cluster by studentId ")
>>> subjects_distributed_clustered = spark.sql("select * from subjects cluster by studentId ")
>>>student_distributed_clustered.createOrReplaceTempView("student_distributed_clustered")
>>>subjects_distributed_clustered.createOrReplaceTempView("subjects_distributed_clustered")
>>> spark.sql("select st.studentid,name,subject,marks from student_distributed_clustered st left join subjects_distributed_clustered sb on (st.studentId = sb.studentId)").show()
Here is the output:
+---------+-------+-------+-----+
|studentid|   name|subject|marks|
+---------+-------+-------+-----+
|      si2|  Maria|   Java|   83|
|      si2|  Maria| Python|   85|
|      si4|    Bob| Python|   84|
|      si4|    Bob|    C++|   78|
|      si3|  Julie|   Ruby|   72|
|      si3|  Julie|   Java|   76|
|      si6|William|   Java|   70|
|      si1|  Robin|   Java|   81|
|      si1|  Robin| Python|   75|
+---------+-------+-------+-----+

In this recipe, you have successfully used the Distribute By, Sort By, and Cluster By clauses to effectively optimize your PySpark SQL queries .

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset