Detecting a shuffle in a process

In this section, we will learn how to detect a shuffle in a process.

In this section, we will cover the following topics:

  • Loading randomly partitioned data
  • Issuing repartition using a meaningful partition key
  • Understanding how shuffle occurs by explaining a query

We will load randomly partitioned data to see how and where the data is loaded. Next, we will issue a partition using a meaningful partition key. We will then repartition data to the proper executors using the deterministic and meaningful key. In the end, we will explain our queries by using the explain() method and understand the shuffle. Here, we have a very simple test.

We will create a DataFrame with some data. For example, we created an InputRecord with some random UID and user_1, and another input with random ID in user_1, and the last record for user_2. Let's imagine that this data is loaded through the external data system. The data can be loaded from HDFS or from a database, such as Cassandra or NoSQL:

class DetectingShuffle extends FunSuite {
val spark: SparkSession = SparkSession.builder().master("local[2]").getOrCreate()

test("should explain plan showing logical and physical with UDF and DF") {
//given
import spark.sqlContext.implicits._
val df = spark.sparkContext.makeRDD(List(
InputRecord("1234-3456-1235-1234", "user_1"),
InputRecord("1123-3456-1235-1234", "user_1"),
InputRecord("1123-3456-1235-9999", "user_2")
)).toDF()

In the loaded data, there is no predefined or meaningful partitioning of our data, which means that the input record number 1 can end up in the executor first, and record number 2 can end up in the executor second. So, even though the data is from the same user, it is likely that we'll be executing operations for the specific user.

As discussed in the previous chapter, Chapter 8, Immutable Design, we used the reducebyKey() method that was taking the user ID or specific ID to reduce all values for the specific key. This is a very common operation but with some random partitioning. It is good practice to repartition the data using a meaningful key.

While using userID, we will use repartition in a way that the result will record the data that has the same user ID. So user_1, for example, will end up on the first executor:

//when
val q = df.repartition(df("userId"))

The first executor will have all the data for userID. If InputRecord("1234-3456-1235-1234", "user_1") is on executor 1 and InputRecord("1123-3456-1235-1234", "user_1") is on executor 2, after partitioning the data from executor 2, we will need to send it to executor 1, because it is a parent for this partition key. This causes a shuffle. A shuffle is caused by loading data that is not meaningfully partitioned, or not partitioned at all. We need to process our data so that we can perform operations on a specific key.

We can repartition the data further, but it should be done at the beginning of our chain. Let's start the test to explain our query:

 q.explain(true)

We are repartitioning the userID expression in a logical plan, but when we check the physical plan, it shows that a hash partition is used and that we will be hashing on the userID value. So, we scan all the RDDs and all the keys that have the same hash and are sent to the same executor to achieve our goal:

In the next section, we'll test operations that cause a shuffle in Apache Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset