Chapter 5. Effective Transformations

Most commonly, Spark programs are structured on RDDs: they involve reading data from stable storage into the RDD format, performing a number of computations and data transformations on the RDD, and writing the result RDD to stable storage or collecting to the driver. Thus, most of the power of Spark comes from its transformations: operations that are defined on RDDs and return RDDs.

At present, Spark contains specialized functionality for about a half-dozen types of RDDs, each with its own properties and scores of different transformation functions. In this section, we hope to give you the tools to think about how your RDD transformation, or series of transformations, will be evaluated. In particular: what kinds of RDDs these transformations return, whether persisting or checkpointing RDDs between transformations will make your computation more efficient, and how a given series of transformations could be executed in the most performant way possible.

Note

The transformations in this section are those associated with the RDD object used in Spark Core (and MLlib). RDDs are also used inside of DStreams with Spark Streaming, but they have different functionality and performance properties. Likewise, most of the functions discussed in this chapter are not yet supported in DataFrames. Since Spark SQL has a different optimizer, not all of the conceptual lessons of this chapter will carry over to the Spark SQL world.

Tip

As Spark moves forward, more RDD transformations will become available on Datasets, which can be used in Spark SQL, and which are discussed in “Datasets”.

Narrow Versus Wide Transformations

In Chapter 2, we introduced one important distinction between types of transformations: those with wide dependencies and those with narrow dependencies. This distinction is important because it has strong implications for how transformations are evaluated and, consequently, for their performance. In this subsection, we will more precisely define the wide and narrow transformations, demonstrate how to determine whether a transformation is wide or narrow, and explain why this distinction matters for evaluation and performance.

Tip

Recall that Spark is lazily evaluated, meaning that a transformation is not executed until an action that depends on that transformation is called. This, as we discussed in detail in “Lazy Evaluation”, has important consequences for fault tolerance, performance, and debugging. If the information in this tip is confusing, please refer back to Chapter 2, which will give you the basic understanding of the Spark execution engine needed for this chapter.

To summarize what we covered in Chapter 2: wide transformations are those that require a shuffle, while narrow transformations are those that do not. In “Wide Versus Narrow Dependencies” we explained that in narrow transformations, the child partitions (the partitions in the resulting RDD) depend on a known subset of the parent partitions. While this definition is correct, it is less precise than the formal definition of narrow transformations.

The 2012 paper that first presented the evaluation semantics for Spark defines transformations with narrow dependencies as those in which “each partition of the parent RDD is used by at most one partition of the child RDD.” The creators define transformations with wide dependencies as transformations in which “multiple child partitions may depend on [each partition in the parent].” This definition states the analogue of what we explained in Chapter 2, in which we defined narrow and wide dependencies in relation to the child RDD’s dependencies. In contrast, the creators’ definition defined narrow and wide dependencies in terms of the dependencies on the parent RDD, rather than those on the child RDD.

We think the definition presented in Chapter 2 is easier to conceptualize since one usually designs a program by thinking from the input data (parent RDD) to the output data (child RDD). However, the Spark evaluation engine (the “DAG”) builds an execution plan in reverse: from the output (the last action) to the input RDD. Thus, the Spark creators’ definition mirrors the way that Spark is evaluated and consequently it is more precise in two important ways. First, the founders’ definition rules out the case of one parent partition having multiple children in a narrow dependency. It explains why coalesce is only a narrow transformation when it is reducing rather than increasing the number of partitions. Second, the founders’ definition clarifies why the number of tasks used to complete a computation corresponds to each output partition rather than each input partition—when RDDs are evaluated; the tasks needed to compute a transformation are computed on the child partitions.

Figure 5-1 shows dependencies between parent and child partitions for narrow and wide transformations for the Spark program in Example 5-1. Assume RDD1 is an RDD of integers.

Narrow vs. Wide Dependencies Between partitions
Figure 5-1. Narrow versus wide dependencies between partitions
Example 5-1. Narrow versus wide example
    //Narrow dependency. Map the rdd to tuples  of (x, 1)
    val rdd2 = rdd1.map(x => (x, 1))
    //wide dependency groupByKey
    val rdd3 = rdd2.groupByKey()

We use the same structure as the diagrams presented in Figures 2-2 and 2-3. Arrows represent partition dependencies. Each child partition has arrows pointing to the parent partitions upon which it depends; if an arrow points from partition y to partition x, that means that x depends on y. Blue arrows represent narrow dependencies and red arrows represent wide dependencies.

We assume the RDD has four partitions. Unlike the diagrams presented in Chapter 2, here we will show how actual records in a very small RDD might be distributed amongst the partitions. In this case we show how RDD1, RDD2, and RDD3 would be partitioned if RDD1 was an RDD of the integers 3, 3, 9, 2, 8, 5, 6, 7.

As you can see, to compute the map step, each child partition depends on just one parent, since the data doesn’t need to be moved between partitions for the operation to be computed. However, in the groupByKey step, Spark needs to move an arbitrary number of the records so that those with the same key are in the same partition in order to combine records corresponding to a single key into one iterator (recall that iterator is a local, rather than a distributed collection). Thus, the child partitions depend on many partitions in the parent RDD.

Note

This diagram is intended to show how partitions, an abstract concept used in Spark evaluation, depend on each other, rather than any physical data movement across machines. Each line of squares in the diagram represents the same executors at different points in time. The arrows denote dependencies between partitions. In fact repartitioning data does not necessarily require data movement across machines, since partitions may reside on the same executor. When changing the partition of a record does require data movement between executors, the records have to be passed through the driver rather than transferred directly between the executors.

Implications for Performance

In “Wide Versus Narrow Dependencies” we asserted that transformations with narrow dependencies are faster to execute partly because narrow transformations can be combined and executed in one pass of the data. In this section we hope to explain why this is from an evaluation perspective.

Narrow dependencies do not require data to be moved across partitions. Consequently narrow transformations don’t require communication with the driver node, and an arbitrary number of narrow transformations can be executed on any subset of the records (any partition) given one set of instructions from the driver. In Spark terminology, we say that each series of narrow transformations can be computed in the same “stage” of the query execution plan.

In contrast, as we stated in “The Anatomy of a Spark Job”, a shuffle associated with a wide dependency marks a new stage in the RDD’s evaluation. Because tasks must be computed on a single partition and the data needed to compute each partition of a wide dependency may be spread across machines, transformations with wide dependencies may require data to be moved across partitions. Thus, the downstream computations cannot be computed before the shuffle finishes.

For example, it should be intuitive that sorting cannot be accomplished with narrow transformations because sorting requires an order to be defined on all of the records—not just within each partition. Indeed, the sortByKey function has wide dependencies. It requires the data to be partitioned, so all the keys within a certain range live on the same partition. That way, sorting the data on each partition leads to a sorted result. Any narrow transformations following the sort cannot be done until after the shuffle completes because the records on each partition may change.

Stage boundaries have important performance consequences. Except in the case of multiple RDD operations like join, the stages associated with one RDD must be executed in sequence (see Chapter 4). Thus, not only are shuffles expensive since they require data movement and potential disk I/O (for the shuffle files), they also limit parallelization.

Implications for Fault Tolerance

The cost of failure for a partition with wide dependencies is much higher than for one with narrow dependencies, since it requires more partitions to be recomputed. If one partition in the parent of a mappedRDD (the resulting RDD type of a map operations) fails, for example, only one of its children must be recomputed, and the tasks needed to recompute that child partition can be distributed across the executors to make this recomputation faster. In contrast, if the parent of the sorted RDD loses a partition, it is possible (in the worst case) that all the child partitions will need to be recomputed. For this reason, the cost of recomputing a partition in the case of failure for a partition with wide dependencies is much higher than for a partition with narrow dependencies.

Chaining together transformations with wide dependencies only increases the risk of a very expensive recomputation particularly if any of the wide transformations have a high probability of causing memory errors. In some instances, the cost of recomputation may be high enough that it is worth checkpointing an RDD, so the intermediate results are saved. We will discuss checkpointing in detail in “Reusing RDDs”.

The Special Case of coalesce

The coalesce operation is used to change the number of partitions in an RDD. As shown by the diagram in Figure 2-2 in Chapter 2, when coalesce reduces the number of output partitions, each parent partition is used in exactly one child partition since the child partitions are the union of several parents. Thus, according to our definition of narrow dependencies, coalesce is a narrow transformation even though it changes the number of partitions in the RDD. Since tasks are executed on the child partition, the number of tasks executed in a stage that includes a coalesce operation is equivalent to the number of partitions in the result RDD of the coalesce transformation.

Tip

Using coalesce, the number of partitions can decrease in one stage without causing a shuffle. However, coalesce causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce, which may be undesirable in some cases. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce to true or by using the repartition function instead.

However, when coalesce increases the number of partitions, each parent partition necessarily depends on several child partitions. Thus, according the more precise definition of wide dependencies presented in “Narrow Versus Wide Transformations”, using coalesce to increase the number of partitions is a wide transformation. The coalesce function prioritizes evenly distributing the data across the child partitions. Consequently, the location of records in the output cannot be determined at design time because it depends on how many records are stored on each input partition, and the number of records on each partition of course cannot be determined without reading the data and evaluating the upstream transformations. Ergo, increasing the number of partitions with either a coalesce or repartition call requires a shuffle.

What Type of RDD Does Your Transformation Return?

RDDs are an abstracted concept in two ways: they can be of almost any arbitrary type of record (e.g., String, Row, Tuple), and they can also be members of one of several implementations of the RDD interface with varying properties. Both distinctions are important for performance and evaluation. The first is important, because some transformations can only be applied to RDDs with certain record types. The second is important because each transformation returns one of the several implementations of the RDD interface, and therefore the same transformation called on two different RDD implementations (such as a mappedRDD versus a GoGroupedRDD) may be evaluated differently. In particular, some RDD implementations retain information about the ordering or locality of the records in the RDD from previous transformations. Understanding the data locality and partitioning information associated with the resulting RDD of a transformation can help avoid unnecessary shuffles. We will save a more detailed discussion of this for “Preserving Partitioning Information Across Transformations” since it is most relevant to Pair RDDs. In this section we will discuss preserving record type information because it can be important for performance and surprisingly difficult as Spark programs get complicated.

The RDD is a collection type which, much like collection types in Scala, Java, and most other strongly typed languages, is instantiated with a type parameter indicating the type of the members in the collection.

Note

In Scala, the syntax for a type parameter is brackets; e.g., List[String], which indicates a sequence of String objects. It is equivalent to the Java syntax < > (e.g., List<String>).

RDDs are similarly typed. For example, if you use sc.textfile to read in your RDD, you will end up with an RDD of String type (denoted RDD[String] in Scala and RDD<String> in Java).

The RDD’s record type information is important because many transformations are only defined on RDDs that are of a particular type, so trying to use methods on an RDD of generic type will return compile-time or runtime errors. For example, if an RDD of tuples has lost its type information and is interpreted by the compiler to be of type RDD[Any] or even RDD[(Any, Int)], calling sortByKey will not compile. The compilation error occurs because sortByKey can only be called on RDDs of key/value pairs where the keys have some implicit ordering. Similarly, numeric functions such as max, min, and sum can only be called on RDDs of Long, Int, or Double.

Record type information is one of many places in the Spark API where implicit conversions are likely to cause difficulties. If you are writing subroutines to be used in RDD transformation, it is often best to specify the helper function’s input and return types concretely and avoid writing them on a generic type.

One instance that often leads to problems losing type information is when working with DataFrames as RDDs. DataFrames can be implicitly converted to RDDs of Rows. However, since the Spark SQL Row object is not strongly typed (it can be created from sequences of any value type), the Scala compiler cannot “remember” the type of value used to create the row. Indexing a row will return a value of type Any, which must be cast to a more specific type, such as a String or an Int, to perform most calculations. The type information for the rows is stored in the schema. However, converting to an RDD throws away a DataFrame’s schema information, so it is important to bind the DataFrame schema to a variable. One of the advantages of the Dataset API is that it is strongly typed, so the values in each row will retain their type information even after conversion to an RDD.

Minimizing Object Creation

“Garbage collection” is the process of freeing up the memory allocated for an object once that object is no longer needed. Since Spark runs in the JVM, which has automatic memory management and large data structures, garbage collection can quickly become an expensive part of our Spark job. Garbage collection or “GC” errors are a common cause of failure. Even if garbage collection overhead doesn’t prohibit a job from running, garbage collection creates additional serialization time, which can significantly slow it down. We can minimize the GC cost by reducing the number of objects and the size of those objects. We can reduce the size and number of our objects by reusing existing objects and by using data structures (such as primitive types) that take up less space in memory.

Reusing Existing Objects

Some RDD transformations allow us to modify the parameters in the lambda expression rather than returning a new object. For example, in the sequence function of the aggregation function for aggregateByKey and aggregate, we can modify the original accumulator argument and define the combine function in such a way that the combination is created by modifying the first of the two accumulators. A common and effective paradigm for complicated aggregations is to define a Scala class with sequence and combine operations that return the existing object using the this.type annotations.

For example, suppose that we wanted to do some custom aggregation that is not already defined in Spark. Let’s say that we have an RDD of key/value pairs where the keys are the panda’s instructors and the values are the pupils’ panda report cards. For each instructor we want to know the length of the longest word used, the average number of words used per report card, and the number of instances of the word “happy.” One valid, easy-to-read approach would be to use the aggregateByKey function, which takes three arguments: a zero value that represents an empty accumulator, a sequence function that takes the accumulator and a value and adds the value to the accumulator, and a combine operator that defines how the accumulators should be combined. In this instance we could define our accumulator to be an object with four fields: the total count of all the words, the total number of reports, the longest word seen so far, and the total number of mentions of the word “happy.”

For clarity we can define this as its own object with methods for sequence and combine. We have named this object MetricsCalculator, and it might be coded as shown in Example 5-2.

Example 5-2. Custom aggregation object
 class MetricsCalculator(
  val totalWords : Int,
  val longestWord: Int,
  val happyMentions : Int,
  val numberReportCards: Int) extends Serializable {

  def sequenceOp(reportCardContent : String) : MetricsCalculator = {
    val words = reportCardContent.split(" ")
    val tW = words.length
    val lW = words.map( w => w.length).max
    val hM = words.count(w => w.toLowerCase.equals("happy"))

    new MetricsCalculator(
      tW + totalWords,
      Math.max(longestWord, lW),
      hM + happyMentions,
      numberReportCards + 1)
  }

   def compOp(other : MetricsCalculator) : MetricsCalculator = {
     new MetricsCalculator(
       this.totalWords + other.totalWords,
       Math.max(this.longestWord, other.longestWord),
       this.happyMentions + other.happyMentions,
       this.numberReportCards + other.numberReportCards)
   }

   def toReportCardMetrics =
     ReportCardMetrics(
       longestWord,
       happyMentions,
       totalWords.toDouble/numberReportCards)
}

We could then use this object in the arguments to our aggregation function, as shown in Example 5-4, in a routine that maps the RDD of instructors, and report text to a case class with the three metrics we care about in Example 5-3.

Example 5-3. Case class for aggregations
case class ReportCardMetrics(
  longestWord : Int,
  happyMentions : Int,
  averageWords : Double)
Example 5-4. Aggregation example without object reuse
  /**
   * Given an RDD of (PandaInstructor, ReportCardText) aggregate by instructor
   * to an RDD of distinct keys of (PandaInstructor, ReportCardStatistics)
   * where ReportCardMetrics is a case class with
   *
   * longestWord -> The longest word in all of the reports written by this instructor
   * happyMentions -> The number of times this intructor mentioned the word happy
   * averageWords -> The average number of words per report card for this instructor
   */
  def calculateReportCardStatistics(rdd : RDD[(String, String)]
  ): RDD[(String, ReportCardMetrics)] ={

    rdd.aggregateByKey(new MetricsCalculator(totalWords = 0,
      longestWord = 0, happyMentions = 0, numberReportCards = 0))(
      seqOp = ((reportCardMetrics, reportCardText) =>
        reportCardMetrics.sequenceOp(reportCardText)),
      combOp = (x, y) => x.compOp(y))
    .mapValues(_.toReportCardMetrics)
  }

This method is superior to using a two map and one reduceByKey method. The aggregate function combines each partition locally, then does a shuffle to perform the cross-partition reduction. However, it has the disadvantage of creating a new instance of our custom object for each record in the dataset and for each combine step. A very simple way to reduce the cost of object creation would be to modify our MetricsCalculator to use Scala’s this.type design paradigm so that the sequence operation modifies the original accumulator and the combine operation modifies the first accumulator rather than returning a new one, as shown in Example 5-5.

Example 5-5. Aggregation example with object reuse
class MetricsCalculatorReuseObjects(
  var totalWords : Int,
  var longestWord: Int,
  var happyMentions : Int,
  var numberReportCards: Int) extends Serializable {

  def sequenceOp(reportCardContent : String) : this.type = {
    val words = reportCardContent.split(" ")
    totalWords += words.length
    longestWord = Math.max(longestWord, words.map( w => w.length).max)
    happyMentions += words.count(w => w.toLowerCase.equals("happy"))
    numberReportCards +=1
    this
  }

  def compOp(other : MetricsCalculatorReuseObjects) : this.type = {
    totalWords += other.totalWords
    longestWord = Math.max(this.longestWord, other.longestWord)
    happyMentions += other.happyMentions
    numberReportCards += other.numberReportCards
    this
  }

  def toReportCardMetrics =
    ReportCardMetrics(
      longestWord,
      happyMentions,
      totalWords.toDouble/numberReportCards)
}

Our aggregation routine will remain the same.

Note

It should be obvious that the Scala code within the sequence operator is slower than it needs to be. Rather than performing three different functional calls on the words array we ought to go through the string as a string buffer, counting the words, keeping track of the longest word, and counting the occurrence of the word “happy” (or at least use a while loop to parse the words array rather than three recursive calls). We have left this solution since we think it is easier to read and the primary intention of the example is to show how to optimize the aggregateByKey Spark routine.

Reduce (which calls aggregate) and the fold operations (foldLeft, fold, foldRight) can also benefit from object reuse. However, these aggregation functions are unique. It is best to avoid mutable data structures in Spark code (and Scala code in general) because they can lead to serialization errors and may have inaccurate results. For many other RDD functions, particularly narrow transformations, modifying the first value of the argument is not safe because the transformations may be chained together with lazy evaluation and may be evaluated multiple times. For example, if you have an RDD of mutable objects, modifying the arrays with a map function may lead to inaccurate results since the objects may be reused more times than you expect—especially if the RDD is recomputed.

Using Smaller Data Structures

Spark can be a memory hog. An important way to optimize Spark jobs for both time and space is to stick to primitive types rather than custom classes. Although it may make code less readable, using arrays rather than case classes or tuples can reduce GC overhead. Scala arrays, which are exactly Java arrays under the hood, are the most memory-efficient of the Scala collection types. Scala tuples are objects, so in some instances it might be better to use a two- or three-element array rather than a tuple for expensive operations. The Scala collection types in general incur a higher GC overhead than arrays.

Notice that our ReportCardMetrics object is just a wrapper for a few numeric values. Although it is less readable and less object-oriented, it is more space-efficient to use a four-element array of integers. We can maintain the same readable code paradigm by using a Scala object instead of a class and defining the sequence, and combine operations as functions on strings and arrays as shown in Example 5-6.

Example 5-6. Using an array as the aggregation object
object MetricsCalculator_Arrays extends Serializable {
  val totalWordIndex = 0
  val longestWordIndex = 1
  val happyMentionsIndex = 2
  val numberReportCardsIndex = 3

  def sequenceOp(reportCardMetrics : Array[Int],
    reportCardContent : String) : Array[Int] = {

    val words = reportCardContent.split(" ")
    //modify each of the elements in the array
    reportCardMetrics(totalWordIndex) += words.length
    reportCardMetrics(longestWordIndex) = Math.max(
      reportCardMetrics(longestWordIndex),
      words.map(w => w.length).max)
    reportCardMetrics(happyMentionsIndex) += words.count(
      w => w.toLowerCase.equals("happy"))
    reportCardMetrics(numberReportCardsIndex) +=1
    reportCardMetrics
  }

  def compOp(x : Array[Int], y : Array[Int]) : Array[Int] = {
    //combine the first and second arrays by modifying the elements
    // in the first array
    x(totalWordIndex)  += y(totalWordIndex)
    x(longestWordIndex) = Math.max(x(longestWordIndex), y(longestWordIndex))
    x(happyMentionsIndex) += y(happyMentionsIndex)
    x(numberReportCardsIndex) += y(numberReportCardsIndex)
    x
  }

  def toReportCardMetrics(ar : Array[Int]) : ReportCardMetrics =
    ReportCardMetrics(
      ar(longestWordIndex),
      ar(happyMentionsIndex),
      ar(totalWordIndex)/ar(numberReportCardsIndex)
    )
}

We would then need to modify our aggregation code slightly. We are not using the same custom aggregation object, and the zero value has changed. This is shown in Example 5-7.

Example 5-7. Aggregation with arrays to minimize expensive object creation
  def calculateReportCardStatisticsWithArrays(rdd : RDD[(String, String)]
  ): RDD[(String, ReportCardMetrics)] = {

    rdd.aggregateByKey(
      //the zero value is a four element array of zeros
      Array.fill[Int](4)(0)
    )(
    //seqOp adds the relevant values to the array
      seqOp = (reportCardMetrics, reportCardText) =>
        MetricsCalculator_Arrays.sequenceOp(reportCardMetrics, reportCardText),
    //combo defines how the arrays should be combined
      combOp = (x, y) => MetricsCalculator_Arrays.compOp(x, y))
    .mapValues(MetricsCalculator_Arrays.toReportCardMetrics)
  }

Within a function, it is often beneficial to avoid intermediate object creation. It is important to remember that converting between types (such as between different flavors of Scala collections) creates intermediate objects. This is yet another place in which implicit conversions may have unfortunate performance implications.

For example, suppose that observing our note in the previous section, you wanted to speed up the sequence function of the MetricsCalculator_ReuseObjects object. Then, you realized that your coworker had written a general-purpose utility that finds the instances of the word “happy” and the longest word in a collection of strings (shown in Example 5-8).

Example 5-8. Function with implicit sequence conversions
  def findWordMetrics[T <:Seq[String]](collection : T ): (Int, Int)={
    val iterator = collection.toIterator
    var mentionsOfHappy = 0
    var longestWordSoFar = 0
    while(iterator.hasNext){
      val n = iterator.next()
      if(n.toLowerCase == "happy"){
        mentionsOfHappy +=1
      }
      val length = n.length
      if(length> longestWordSoFar) {
        longestWordSoFar = length
      }

    }
    (longestWordSoFar, mentionsOfHappy)
  }

Your coworker helpfully defined her function on any type that extends a Scala Traversable index. Thus, you won’t need to convert the array of words at all and can happily write the code shown in Example 5-9.

Example 5-9. Aggregation with bad implicit conversions
  val totalWordIndex = 0
  val longestWordIndex = 1
  val happyMentionsIndex = 2
  val numberReportCardsIndex = 3
  def fasterSeqOp(reportCardMetrics : Array[Int], content  : String): Array[Int] = {
    val words: Seq[String] = content.split(" ")
    val (longestWord, happyMentions) = CollectionRoutines.findWordMetrics(words)
    reportCardMetrics(totalWordIndex) += words.length
    reportCardMetrics(longestWordIndex) = longestWord
    reportCardMetrics(happyMentionsIndex) += happyMentions
    reportCardMetrics(numberReportCardsIndex) +=1
    reportCardMetrics
  }

Unfortunately, in terms of object creation, this new implementation is actually worse than the previous one. It creates two extra objects containing the collection with the words each time a sequence operation is called! First when you call the findWordMetrics routine, since the input array has to be implicitly converted to a Traversable object (creating a new object of the same size), and again when your coworker’s code casts the Traversable object to an Iterator.

Warning

Modifying a value passed into your transformation is not always safe, so double-check the documentation for the function you are using.

Tip

Beyond reducing the objects that are directly allocated, Scala’s implicit conversions can sometimes cause additional allocations in the process of converting.

Iterator-to-Iterator Transformations with mapPartitions

The RDD mapPartitions function takes as its argument a function from an iterator of records (representing the records on one partition) to another iterator of records (representing the output partition).

The mapPartitions transformation is one of the most powerful in Spark since it lets the user define an arbitrary routine on one partition of data. The mapPartitions transformation can be used for very simple data transformations like string parsing, but it can also be used for complex, expensive data-processing work to solve problems such as secondary sort or highly custom aggregations. Many of Spark’s other transformations, like filter, map, and flatMap, can be built using mapPartitions. Optimizing the mapPartitions routines is an important part of writing complicated and performant Spark code, as we will see in Chapter 6. To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of mapPartitions in such a way that your functions do not force loading the entire partition in-memory (e.g., implicitly converting to a list). Iterators have many methods we can use to write functional-style transformations. You may also construct your own custom iterator extending the Iterator interface. When a transformation directly takes and returns an iterator without forcing it through another collection, we call it an iterator-to-iterator transformation.

What Is an Iterator-to-Iterator Transformation?

A Scala iterator object is not actually a collection, but a function that defines a process of accessing the elements in a collection one-by-one. Not only are iterators immutable, but the same element in an iterator can only be accessed one time. In other words, iterators can only be traversed once, and they extend the Scala interface TraversableOnce. Iterators have some of the same methods defined on them as other immutable Scala collections, such as mappings (map and flatMap), additions (++), folds (foldLeft, reduceRight, reduce), element conditions (forall and exists), and traversals (next and foreach). In some instances, these methods behave differently than other Scala collections. Since the iterator can only be traversed once, any of the iterator methods that require looking at all the elements in the iterator will leave the original iterator empty.

Note

Java has its own implementation of iterators, java.util.Iterator, which have the same benefits as Scala iterators for Spark’s evaluation.

Warning

Beware of your function calls. It is easy to accidentally consume an iterator by calling an object that traverses through the iterator such as size, or to trigger an implicit conversion. Iterators can be converted to any other Scala collection type. However, converting them requires accessing each of the elements. Thus, after it has been converted to a new collection type, an iterator will be at its last element (empty).

In some ways it can be helpful to conceptualize iterator methods as we would RDD methods—as either transformations or actions—because like an RDD, an iterator is actually a set of evaluation instructions rather than a stored state. Some iterator methods, like next, size, and foreach, traverse the iterator and evaluate it (more like an action). Others, like map and flatMap, return a new iterator—which is really a set of evaluation instructions—much like RDD transformations return a new RDD. However, in contrast to Spark transformations, iterator transformations are executed linearly, one element at at time, rather than in parallel. This makes iterators slower but much easier to use than if they could be executed in parallel. For example, if we needed to store some information about the records we have seen, we can do that in a filter or a map function on the iterator, since the map/filter routine will be applied to each element sequentially. (See Example 5-12 at the end of this section.) One-to-one functions are also not chained together in iterator operations so using three map calls still requires looking at each element in the iterator three times.

By “iterator-to-iterator transformation” we mean using one of these iterator “transformations” to return a new iterator rather than a) converting the iterator to a different collection or b) evaluating the iterator with one of the iterator “actions” and building a new collection. To reiterate: using a while loop to traverse the elements of an iterator and build a new collection (even a new iterator) does not qualify as an iterator-to-iterator transformation. Converting an iterator to a more intuitive collection type, manipulating it, and converting back to an iterator is not an iterator-to-iterator transformation. Indeed, converting the iterator argument in mapPartitions to a collection object eliminates all the benefits of iterator-to-iterator transformations.

Space and Time Advantages

The primary advantage of using iterator-to-iterator transformations in Spark routines is that their transformations allow Spark to selectively spill data to disk. Conceptually, an iterator-to-iterator transformation means defining a process for evaluating elements one at a time. Thus, Spark can apply that procedure to batches of records rather than reading an entire partition into memory or creating a collection with all of the output records in-memory and then returning it. Consequently, iterator-to-iterator transformations allow Spark to manipulate partitions that are too large to fit in memory on a single executor without out memory errors.

Furthermore, keeping the partition as an iterator allows Spark to use disk space more selectively. Rather than spilling an entire partition when it doesn’t fit in memory, the iterator-to-iterator transformation allows Spark to spill only those records that do not fit in memory, thereby saving disk I/O and the cost of recomputation. Lastly, using methods defined on iterators avoids defining intermediary data structures. Reducing the number of large intermediate data structures is a way to avoid unnecessary object creation, which can slow down garbage collection as we talked about in “Minimizing Object Creation”.

Note

Unfortunately the Spark Streaming mapPartitions API is one of relatively few places where the Scala API decisively outperforms its Java counterpart. Prior to Spark 1.6, mapPartitions in Spark Streaming was defined on objects of type Java Iterable rather than Java Iterator and thus automatically reads the entire collection into memory. In the Spark Core, the Java API still uses Iterable rather than iterators as the grouped result of groupByKey, thus eliminating the possibility of using an iterator-to-iterator transformation after a groupByKey call.

An Example

For all their advantages, iterators can be a much harder abstraction to conceptualize and use than collection types such as arrays and hash maps, with which users may be more familiar from other languages. Here we provide an example of a complicated mapPartitions routine, which given a sorted RDD of (value, columnIndex), count) tuples and a list of rank statistics on this partition, returns the (value, columnIndex) pairs that represent ranks statistics, shown in Example 5-10. This method is part of the optimal solution to the “Goldilocks problem,” which is presented in full in “Goldilocks Version 4: Reduce to Distinct on Each Partition” and introduced in “The Goldilocks Example”.

Example 5-10. Example mapPartitions
  private def findTargetRanksIteratively(
          sortedAggregatedValueColumnPairs : RDD[((Double, Int), Long)],
          ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] = {

    sortedAggregatedValueColumnPairs.mapPartitionsWithIndex((partitionIndex : Int,
      aggregatedValueColumnPairs : Iterator[((Double, Int), Long)]) => {

      val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2
     if (targetsInThisPart.nonEmpty) {
       FindTargetsSubRoutine.asIteratorToIteratorTransformation(
         aggregatedValueColumnPairs,
         targetsInThisPart)
     } else {
       Iterator.empty
     }
    })
  }

This routine is a good example of a place where we are likely to see performance gains from an iterator-to-iterator transformation, since it is a complicated routine performed on partitions that we anticipate will be too large to fit in memory. However, it is an instance where using iterators is, from a design perspective, a non-obvious choice because we have to keep a map of running totals with the number of elements for each column we have seen so far. A more straightforward way to design this routine would be as follows: loop through the iterator, store the running totals in a hashMap, and build a new collection of the elements we want to keep using an array buffer—then convert the array buffer to an iterator, shown in Example 5-11.

Example 5-11. MapPartitions example without an iterator-to-iterator transformation
  def withArrayBuffer(valueColumnPairsIter : Iterator[((Double, Int), Long)],
    targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {

      val columnsRelativeIndex: Predef.Map[Int, List[Long]] =
        targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))

    // The column indices of the pairs that are desired rank statistics that live in
    // this partition.
      val columnsInThisPart: List[Int] = targetsInThisPart.map(_._1).distinct

    // A HashMap with the running totals of each column index. As we loop through
    // the iterator, we will update the hashmap as we see elements of each
    // column index.
      val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
      runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap

    //we use an array buffer to build the resulting iterator
      val result: ArrayBuffer[(Int, Double)] =
      new scala.collection.mutable.ArrayBuffer()

      valueColumnPairsIter.foreach {
        case ((value, colIndex), count) =>

          if (columnsInThisPart contains colIndex) {

            val total = runningTotals(colIndex)
            //the ranks that are contained by this element of the input iterator.
            //get by filtering the
            val ranksPresent = columnsRelativeIndex(colIndex)
              .filter(index => (index <= count + total) && (index > total))
            ranksPresent.foreach(r => result += ((colIndex, value)))
            //update the running totals.
            runningTotals.update(colIndex, total + count)
        }
      }
    //convert
    result.toIterator
  }

At first this looks like an okay solution since we are estimating that the number of elements we are returning is small, and because array buffers are usually a relatively performant way to build up Scala collections. However, if the input data is very large relative to the cluster size, we still see out-of-memory errors and failures in this step. A more efficient solution would be to use an iterator-to-iterator transformation. We can convert this subroutine to an iterator-to-iterator transformation although our routine is not parallelizable (it requires keeping a list of running totals). We can do this because the subroutine we need can be completed on one element of the iterator without any information about the other elements. The final solution uses the filter function of iterators—to eliminate any elements that are not in the final data—and a flatMap to build the new iterator of elements in the resulting partitions, as shown in Example 5-12.

Example 5-12. MapPartitions with iterator-to-iterator transformations
  def asIteratorToIteratorTransformation(
    valueColumnPairsIter : Iterator[((Double, Int), Long)],
    targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {

    val columnsRelativeIndex = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
    val columnsInThisPart = targetsInThisPart.map(_._1).distinct

    val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
     runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap

    //filter out the pairs that don't have a column index that is in this part
    val pairsWithRanksInThisPart = valueColumnPairsIter.filter{
      case (((value, colIndex), count)) =>
        columnsInThisPart contains colIndex
     }

    // map the valueColumn pairs to a list of (colIndex, value) pairs that correspond
    // to one of the desired rank statistics on this partition.
    pairsWithRanksInThisPart.flatMap{

      case (((value, colIndex), count)) =>

          val total = runningTotals(colIndex)
          val ranksPresent: List[Long] = columnsRelativeIndex(colIndex)
                                         .filter(index => (index <= count + total)
                                           && (index > total))

          val nextElems: Iterator[(Int, Double)] =
            ranksPresent.map(r => (colIndex, value)).toIterator

          //update the running totals
          runningTotals.update(colIndex, total + count)
          nextElems
    }
  }

This approach allows the function to spill to disk selectively by working with each element in the iterator one at a time. This implementation saves space by incrementally building the result rather than storing the new collection type in memory as an array buffer. It saves a penny on garbage collection by not creating the array buffer as an intermediate step.

Tip

If you are using an ArrayBuffer to build a new collection for mapPartitions, it is always possible (and likely more performant) to use a map or flatMap on the iterator to incrementally add new elements.

Set Operations

Spark has a variety of set-like operations, some of which are expensive and some of which have different behavior than the mathematical definitions of the equivalent operations. In this section we hope to explain how to use these operations safely and effectively.

Since RDDs aren’t distinct, they mainly differ from mathematical set operations in how they handle duplicates. For example, union merely combines its arguments, so the result of union will always have the size of both RDDs combined. intersection and subtract are defined similarly to their set-theoretic counterparts, but since the input RDDs (unlikely mathematical sets) can have duplicates the results may be unexpected. Subtracting will remove all of the elements in the first RDD that have a key present in the second RDD. Thus it is possible that by subtracting, the result will be smaller than the size of the first RDD minus the size of the second, breaking one of the laws of set theory.

For example, the simple unit test in Example 5-13 will pass.

Example 5-13. Subtract example
    val a = Array(1, 2, 3, 4, 4, 4, 4)
    val b = Array(3, 4)
    val rddA = sc.parallelize(a)
    val rddB = sc.parallelize(b)
    val rddC = rddA.subtract(rddB)
    assert(rddC.count() < rddA.count() - rddB.count())

In Spark, intersection co-groups the argument RDDs using their values as keys and filters out those elements that don’t appear in both. Consequently the result of RDD intersection contains no duplicates. Although this is the expected behavior for intersection, using several set operations on RDDs containing duplicates can lead to unexpected behavior. The union of the two RDDs in Example 5-13 is an RDD containing two elements, 1 and 2. Thus, as the unit test in Example 5-14 demonstrates, we cannot always “re-create” rddA as the union of the intersection and the subtraction.1

Example 5-14. Intersection example
    val a = Array(1, 2, 3, 4, 4, 4, 4)
    val b = Array(3, 4)
    val rddA = sc.parallelize(a)
    val rddB = sc.parallelize(b)
    val intersection = rddA.intersection(rddB)
    val subtraction = rddA.subtract(rddB)
    val union = intersection.union(subtraction)
    assert(!rddA.collect().sorted.sameElements(union.collect().sorted))
Tip

To make an RDD more like a set, you can use distinct prior to computing any set operations. However, calling distinct will cause a shuffle, and if the data is not already partitioned this can be expensive.

Reducing Setup Overhead

Some operations require setup work per-worker or per-partition, like creating a database connection or setting up a random number generator. For transformations you can use mapPartitions, do the setup work per partition in the map function, and then perform your desired transformation on the iterator for the partition. We will illustrate doing this with a pseudorandom number generator in Example 5-15.

Example 5-15. Create one random number generator per partition using broadcast variable
    rdd.mapPartitions{itr =>
      // Only create once RNG per partitions
      val r = new Random()
      itr.filter(x => r.nextInt(10) == 0)
    }
Tip

It is important to remember to use an iterator-to-iterator transformation to allow spilling to disk selectively, as discussed in “Iterator-to-Iterator Transformations with mapPartitions”.

Beyond using this pattern to reduce setup overhead in transformations, another common pattern is to create a connection inside of an action to save the data. If your work is writing out the data you can use the same pattern as with mapPartitions except with foreachPartition.

If the setup work can be serialized, a broadcast variable can distribute the object that we cover next. If the setup work can’t be serialized, a broadcast variable with a transient lazy val can be used as well. See Example 5-17 in the next section.

Shared Variables

Spark has two types of shared variables—broadcast variables and accumulators—each of which can only be written in one context (driver or worker, respectively) and read in the other. Broadcast variables can be written in the driver program and read on the executors, whereas accumulators are written onto the executors and read on the driver.

Broadcast Variables

Broadcast variables give us a way to take a local value on the driver and distribute a read-only copy to each machine rather than shipping a new copy with each task. Broadcast variables might not seem especially useful, since we can just capture a local variable in our closure to transfer data from the driver to the workers; however, the savings of only sending one copy per machine versus sending one copy per task can make a huge difference, especially when the same broadcast variable is used in additional transformations. Two common examples of using broadcast variables are a) broadcasting a small table to join against and b) broadcasting a machine learning model to be able to run the predictions on our data.

Creating a broadcast variable is done by calling broadcast on the SparkContext. This distributes the value to the workers and gives us back a wrapper that allows us to access the value on the workers by calling value, as shown in Examples 5-16 and 5-17. If a broadcast variable is created with a variable input, the input should not be modified after the variable has been created since existing workers will not see the updates and new workers may see the new value.

Example 5-16. Sample broadcast of a hashset of invalid panda locations to filter out
    val invalid = HashSet() ++ invalidPandas
    val invalidBroadcast = sc.broadcast(invalid)
    input.filter{panda => !invalidBroadcast.value.contains(panda.id)}
Example 5-17. Create one random number generator per worker
  class LazyPrng {
    @transient lazy val r = new Random()
  }
  def customSampleBroadcast[T: ClassTag](sc: SparkContext, rdd: RDD[T]): RDD[T]= {
    val bcastprng = sc.broadcast(new LazyPrng())
    rdd.filter(x => bcastprng.value.r.nextInt(10) == 0)
  }
Warning

The value for a broadcast variable must be a local, serializable value: no RDDs or other distributed data structures.

Internally, Spark uses broadcast variables for the Hadoop job configuration objects and large blocks of Python code for UDFs. If a broadcast variable is no longer needed, you can explicitly remove it by calling unpersist() on the broadcast variable.

Accumulators

Accumulators are the second type of Spark’s shared variables, allowing us to collect by-product information from a transformation or action on the workers and then bring the result back to the driver. With Spark’s execution model, Spark adds to accumulators only once the computation has been triggered (e.g., by an action). If the computation happens multiple times, Spark will update the accumulator each time. This multiple counting can be desirable for process-level information, like computing the entire time spent parsing records. However, it can be disastrous for data-related information like counting the number of invalid records.

Note

Spark accumulators have had an API update for 2.0—these examples are updated for the 2.X API, although 1.X examples are still available in the examples repo.

Warning

Accumulators can be unpredictable. In their current state, they are best used where potential multiple counting is the desired behavior.

Accumulators have a number of built-in types that make it easy to create an accumulator for common use cases. Accumulators are not intended for collecting large amounts of information, so if you find yourself adding a large number of elements to a collection or appending to a string you may wish to consider a separate action instead of an accumulator. The default operation for numeric accumulators is the + operation, so we could use this to sum the fuzzyness of all of the pandas as shown in Example 5-18.

Example 5-18. Compute fuzzyness of pandas with accumulators
  def computeTotalFuzzyNess(sc: SparkContext, rdd: RDD[RawPanda]):
      (RDD[(String, Long)], Double) = {
    // Create a named accumulator for doubles
    val acc = sc.doubleAccumulator("fuzzyNess")
    val transformed = rdd.map{x => acc.add(x.attributes(0)); (x.zip, x.id)}
    // accumulator still has zero value
    // Note: This example is dangerous since the transformation may be
    // evaluated multiple times.
    transformed.count() // force evaluation
    (transformed, acc.value)
  }

Additionally, accumulators support a wide variety of data types provided the operation is associative, but some are easier to get in trouble with than others. To use an accumulator of a different type, you need to implement the AccumulatorV2[InputType, ValueType] interface and provide reset, copy, isZero, value , merge, and add methods. You are responsible for the specifics of the class that keeps track of the accumulated values. In general, a simple var or two will do the trick. In addition to the required method, override resetAndCopy to improve performance in certain cases.

Generally the reset and copy methods are used together with the resetAndCopy method, which can often be more efficiently implemented to avoid the copy stage (as is done in both of the custom accumulator examples, Examples 5-19 and 5-20). The reset method resets the value of the current accumulator back to “zero” so that isZero, if called, will return true. The copy method needs to create a copy of the provided accumulator, with the new accumulator having the same value as the current accumulator. This is called when copying the value to the workers so that Spark can avoid the expense (and confusion) of copying any of the previously accumulated work to the drivers.

The type parameters of the AccumulatorV2 interface specify the type being accumulated over (add) and the final return type (value). Importantly, this does not constrain or specify the type used to hold the accumulation itself. A single variable is used to keep track of values in the following examples. However, you need not limit yourself to one variable. Inside of many of Spark’s numeric accumulators, two vars are used.

The merge method for the accumulator API’s type signature takes the same base AccumulatorV2 type. Since the AccumulatorV2 trait doesn’t specify anything about how workers should keep track of the values as they are evaluated, you will need to cast the accumulator you receive to the expected type so you can access your own internal accumulation field(s). A basic implementation of this is shown in Example 5-19.

Example 5-19. Compute maximum panda id
  def computeMaxFuzzyNess(sc: SparkContext, rdd: RDD[RawPanda]):
      (RDD[(String, Long)], Option[Double]) = {
    class MaxDoubleAccumulator extends AccumulatorV2[Double, Option[Double]] {
      // Here is the var we will accumulate our value in to.
      var currentVal: Option[Double] = None
      override def isZero = currentVal.isEmpty

      // Reset the current accumulator to zero - used when sending over the wire
      // to the workers.
      override def reset() = {
        currentVal = None
      }

      // Copy the current accumulator - this is only really used in context of
      // copy and reset - but since it's part of the public API let's be safe.
      def copy() = {
        val newCopy = new MaxDoubleAccumulator()
        newCopy.currentVal = currentVal
        newCopy
      }

      // We override copy and reset for "speed" - no need to copy the value if
      // we are going to zero it right away. This doesn't make much difference
      // for Option[Double] but for something like Array[X] could be huge.

      override def copyAndReset() = {
        new MaxDoubleAccumulator()
      }

      // Add a new value (called on the worker side)
      override def add(value: Double) = {
        currentVal = Some(
          // If the value is present compare it to the new value - otherwise
          // just store the new value as the current max.
          currentVal.map(acc => Math.max(acc, value)).getOrElse(value))
      }

      override def merge(other: AccumulatorV2[Double, Option[Double]]) = {
        other match {
          case otherFuzzy: MaxDoubleAccumulator =>
            // If the other accumulator has the option set merge it in with
            // the standard add procedure. If the other accumulator isn't set
            // do nothing.
            otherFuzzy.currentVal.foreach(value => add(value))
          case _ =>
            // This should never happen, Spark will only call merge with
            // the correct type - but that won't stop someone else from calling
            // merge so throw an exception just in case.
            throw new Exception("Unexpected merge with unsupported type" + other)
        }
      }
      // Return the accumulated value.
      override def value = currentVal
    }
    // Create a new custom accumulator
    val acc = new MaxDoubleAccumulator()
    sc.register(acc)
    val transformed = rdd.map{x => acc.add(x.attributes(0)); (x.zip, x.id)}
    // accumulator still has None value.
    // Note: This example is dangerous since the transformation may be
    // evaluated multiple times.
    transformed.count() // force evaluation
    (transformed, acc.value)
  }

This still requires that the result is the same as the type we are accumulating. If we wanted to collect all of the distinct elements, we would likely want to collect a set and the types would be different. This is shown in Example 5-20.

Example 5-20. Compute unique panda ids
  def uniquePandas(sc: SparkContext, rdd: RDD[RawPanda]): HashSet[Long] = {
    class UniqParam extends AccumulatorV2[Long, HashSet[Long]] {
      var accValue: HashSet[Long] = new HashSet[Long]()

      def value = accValue

      override def copy() = {
        val newCopy = new UniqParam()
        newCopy.accValue = accValue.clone
        newCopy
      }
      override def reset() = {
        this.accValue = new HashSet[Long]()
      }
      override def isZero() = {
        accValue.isEmpty
      }

      // We override copy and reset for speed - no need to copy the value if
      // we care going to zero it right away.
      override def copyAndReset() = {
        new UniqParam()
      }
      // For adding new values
      override def add(value: Long) = {
        accValue += value
      }
      // For merging accumulators
      override def merge(other: AccumulatorV2[Long, HashSet[Long]]) = {
        other match {
          case otherUniq: UniqParam =>
            accValue = accValue ++ otherUniq.accValue
          case _ =>
            throw new Exception("only support merging with same type")
        }
      }
    }
    // Create an accumulator for keeping track of unique values
    val acc = new UniqParam()
    // Register with a name
    sc.register(acc, "Unique values")
    val transformed = rdd.map{x => acc.add(x.id); (x.zip, x.id)}
    // accumulator still has Double.MinValue
    transformed.count() // force evaluation
    acc.value
  }
Tip

The value function can perform complex work and return a different type than the input type or internal accumulated value. For example, if you were computing the average, you might have a value function that divides two longs returning a double.

Note

You may provide a name for accumulators in Scala so they show up in the web UI. Simply add a name as the second param. This does involve calling toString on the accumulator, though—so if that is an expensive operation, leave your accumulator unnamed.

When working with cached data our accumulators can seem almost consistent, but as discussed in “Interaction with Accumulators” this is not the case.

Tip

There is a proposal to add data property (or “consistent”) accumulators in Spark 2.1.2 Property accumulators would avoid double counting—but this remains unmerged. You can follow its progress in this pull request.

Internally, beginning in Spark 2.0, Spark uses accumulators to keep track of task metrics.

Reusing RDDs

Spark offers several options for RDD reuse, including persisting, caching, and checkpointing. However, Spark does not perform any of these automatically3 because storing RDD for reuse breaks some pipelining, which can be a waste if the RDD is only used once or if the transformation is inexpensive to recompute. All kinds of persistence (of which caching is one type) and checkpointing have some cost and are unlikely to improve performance for operations that are performed only once. Furthermore, on large datasets the cost of persisting or checkpointing can be so high that recomputing is more desirable. However, for some specific kinds of Spark programs, reusing an RDD can lead to huge performance gains, both in the terms of speed and reducing failures.

Cases for Reuse

In this section we cover some instances when persisting or checkpointing RDDs may foster performance gains. Broadly speaking, the most important cases for reuse are using an RDD many times; performing multiple actions on the same RDD; and for long chains of (or very expensive) transformations.

Iterative computations

For transformations that use the same parent RDD multiple times, reusing an RDD forces evaluation of that RDD and so can help avoid repeated computations. For example, if you were performing a loop of joins to the same dataset, persisting that dataset could lead to huge performance improvements since it ensures that the partitions of that RDD will be available in-memory to do each join.

In Example 5-21 we are computing the root mean squared error (RMSE) on a number of different RDDs representing predictions from different models. To do this we have to join each RDD of predictions to an RDD of the data in the validation set.

Note

In this example we use persist(), which persists the RDD in memory. As we will explain in “Types of Reuse: Cache, Persist, Checkpoint, Shuffle Files”, cache() is equivalent to persist(), which is equivalent to persist("MEMORY_ONLY").

Example 5-21. A function with iterative computations
    val testSet: Array[RDD[(Double, Int)]] =
      Array(
        validationSet.mapValues(_ + 1),
        validationSet.mapValues(_ + 2),
        validationSet)
    validationSet.persist() //persist since we are using this RDD several times
    val errors = testSet.map( rdd => {
        rmse(rdd.join(validationSet).values)
    })

Without persisting, Spark would have to reload and repartition the training dataset RDD to complete the join. However, with persistence, the training RDD will stay loaded in memory on the executors with each run of the algorithm. We discuss performance considerations with different kinds of joins in detail in “Core Spark Joins”.

Checkpointing, another form of RDD reuse that writes an RDD to external storage, will also break the RDD’s lineage. However, checkpointing will keep the partitions loaded on the executors.

Multiple actions on the same RDD

If you do not reuse an RDD, each action called on an RDD will launch its own Spark job with the full lineage of RDD transformations. Persisting and checkpointing breaks the RDD’s lineage, so the same series of transformations preceding the persist or checkpoint call will be executed only once. Because persisting or checkpointing an RDD lasts for the duration of a Spark application (although it may be evicted by subsequent cached/persisted data), an RDD persisted during one Spark job will be available in a subsequent job executed with the same SparkContext. For example, suppose that we wanted to collect the first 10% of the records in an RDD. We could use the code in Example 5-22, which calls sortByKey, then count, then take.

Example 5-22. An example of two actions without a persist step
    val sorted = rddA.sortByKey()
    val count = sorted.count() // sorted Action 1
    val sample: Long = count / 10
    val sampled = sorted.take(sample.toInt) // sorted Action 2

The sortByKey (and presumably the read operation) needed to create the RDD, sorted, will occur twice if we do not store the RDD: once in the job called by count and again in the job called by take. We can’t test this element of the execution programmatically, but if you were to run this application and view the web UI you would see that this code launches two jobs and each one includes a sort stage. However, if we add a persist or checkpoint call before the actions (as shown in Example 5-23), the transformation will only be executed once, since Spark builds a lineage graph from either an RDD’s creation or a persisted/checkpointed RDD.

Example 5-23. Two actions with a persist step
    val sorted = rddA.sortByKey()
    sorted.persist()
    val count = sorted.count() // sorted Action 1
    val sample: Long = count / 10
    val sampled = sorted.take(sample.toInt) // sorted Action 2
Tip

Persisted RDDs only survive for the duration of a Spark application. To reuse data between Spark applications, use checkpointing with the same directory.

If the cost to compute each partition is very high

Even if a program does not use the same RDD multiple times, persisting and checkpointing can speed up a routine and reduce the cost of failures by storing intermediary results. Persisting or checkpointing can be particularly useful if the cost of computing one partition is very high because they ensure that the entire expensive operation will not need to be recomputed in the case of downstream failures.

For example, if your program requires a long series of one-to-one transformations, those transformations will all be combined into very computationally intensive tasks. While this is good so long as the tasks succeed and fit in memory, it does mean that if one of the downstream transformations fails, then the cost to recompute a single partition may be enormous. If all of the narrow transformations together create more GC overhead or memory strain than your cluster’s executors can handle, then checkpointing or persisting off_heap can be particularly useful. Both persisting off_heap and checkpointing allow the RDD to be stored outside of the Spark executor memory, leaving space to compute. These options are also the only way to prevent recomputation if the entire Spark worker fails. Sometimes breaking up a long lineage graph for its own sake can help a job succeed since it means each of the tasks will be smaller.

Narrow transformations are generally faster than wide ones. However, some individual narrow transformations, such as training a model per partition or working with very wide rows, can be expensive. In these cases, reusing an RDD after the expensive computation so it is not recomputed may improve performance.

Deciding if Recompute Is Inexpensive Enough

Although persisting in memory is a flagship feature of Spark, it is not free. It is space intensive to store data in memory and will take time to serialize and deserialize. As we will discuss in “Dividing the Space Within One Executor”, persisting in memory and in-memory computations are both done in the Spark executor JVM. Thus, persisting in memory may take space that could be used for downstream computations or increase the risk or memory failures. Caching with Java-based memory structures (any of Spark’s options besides using off_heap storage options) will incur a much higher garbage collecting cost than will recomputing.

Persisting to disk or checkpointing (writing the RDD to an external filesystem) has the disadvantages of MapReduce, causing expensive write and read operations. If the RDD is checkpointed or persisted to disk we must factor in not only the disk space used on the cluster to write the RDD, but also the computational cost on the Spark executors of the additional disk I/O. In most cases, checkpointing a large RDD can be used to reduce failures in high-traffic clusters but rarely leads to performance improvements, even if the RDD has to be recomputed due to the high cost of checkpointing.

Tip

Our experience has been that it is easy to underestimate just how expensive storing and reading an RDD is relative to recomputing. We have also found that for relatively simple operations the cost of the read operation needed to load the RDD far outweighs the others, so persisting is most useful when it prevents triggering another read operation or in the case of many iterative computations.

Furthermore, breaking an RDD’s lineage by forcing evaluation through persisting or checkpointing prevents transformations with narrow dependencies from being combined into a single task. Consequently, we lose some of the narrow transformations cannot be combined and executed in one task. For instance, persisting or checkpointing between a simple map and filter step will break pipelining so that the previously intermediate data can be persisted, causing Spark to do two passes through the data rather than just one, since the transformation has to be evaluated in order to materialize the RDD after the map. Breaking lineage between narrow transformations is only desirable in the most extreme cases.

The preceding guidelines are good heuristics for when reuse will provide significant benefits. In general, it is worth reusing an RDD rather than recomputing it if the computation is large relative to your cluster and the rest of your job. The best way to tell if you need to reuse your RDDs is to run a job. If your job runs very slowly, see if persisting the RDDs may help before attempting to rewrite the program since persisting and checkpointing will help reduce the cost of recomputing data in the case of a failure or eliminate it altogether. If a job is failing with GC or out-of-memory errors, checkpointing or persisting off_heap may allow the job to complete, particularly if the cluster is noisy. On the other hand, if you were already persisting with the options that use in-memory persistence consider removing the persist call or switching to checkpointing or off_heap persistence.

Tip

If you are testing some code before putting it into production, consider creating the persistence level with a variable so that you can pass in a persistence level to try as a command-line argument. The function presented in Example 5-24 uses this paradigm; it contains a storageLevel argument (which could be NONE).

Types of Reuse: Cache, Persist, Checkpoint, Shuffle Files

If you decide that you need to reuse your RDD, Spark provides a multitude of options for how to store the RDD. Thus it is important to understand when to use the various types of persistence. There are three primary operations that you can use to store your RDD: cache, persist, and checkpoint. In general, caching (equivalent to persisting with the in-memory storage) and persisting are most useful to avoid recomputation during one Spark job or to break RDDs with long lineages, since they keep an RDD on the executors during a Spark job. Checkpointing is most useful to prevent failures and a high cost of recomputation by saving intermediate results. Like persisting, checkpointing helps avoid computation, thus minimizing the cost of failure, and avoids recomputation by breaking the lineage graph.

Persist and cache

Persisting an RDD means materializing an RDD (usually by storing it in-memory on the executors), for reuse during the current job. Spark remembers a persisted RDD’s lineage so that it can recompute it for the duration of a Spark job if one of the persisted partitions is lost. After the job ends, the persist function takes a StorageLevel argument that specifies how the RDD should be stored. Spark provides a number of different storage levels as constants, but each one is created based on five attributes of how to store the RDD: useDisk, useMemory, useOfHeap, deserialized, and replication. Calling toString on a storage level will reveal what options it contains. The Spark documentation about persistence includes a fairly comprehensive list of the out-of-the-box storage options that are exposed to you.

Still, we think it may useful to provide some more information about each of the five properties that compose each storage option. This should give you a deeper understanding of which option to choose:

useDisk

If set, partitions that do not fit in memory will be written to disk.

The storage-level flags containing DISK (such as MEMORY_AND_DISK) enable this. By default, if partitions do not fit in memory, they will simply be evicted and will need to be recomputed when the persisted RDD is used (see “LRU Caching”). Therefore, persisting to disk can ensure that recomputation of those additional large partitions is avoided. However, reading from disk can be time-intensive, so persistence to disk is only important if the cost of recomputation is particularly high.

Tip

It may be beneficial to allow writing to disk if you expect that an RDD cannot fit in memory. However, if the cost of recomputing the partitions is not high (they are simple mappings and don’t reduce the size of the data) it may actually be faster to recompute some partitions rather than read from disk.

useMemory

If set, the RDD will be stored in-memory or be directly written to disk.

The DISK_ONLY storage levels are the only options that mark this as false. Most of the speed benefits of caching come from keeping RDDs in memory, so if the motivation for reuse is fast access for repeated computations, it is probably a good idea to choose a storage option that stores partitions in memory. However, there are some cases where disk-only persistence makes sense, e.g., when the computation is more expensive than reading in local disk or the network filesystem is especially slow (such as with certain object stores).

useOfHeap

If set, the RDD will be stored outside of the Spark executor in an external system such as Tachyon.

The storage option off_heap enables this property. If memory is a serious issue, or a cluster is noisy and partitions are evicted, this option may be compelling. We will talk more about the benefits of Tachyon in “Alluxio (nee Tachyon)”.

deserialized

If set, the RDD will be stored as deserialized Java objects.

As we will discuss in “Kryo”, this can make storing RDDs more space efficient, especially when using a faster serializer—but incurs some performance overhead. Storage options that include the "_SER" suffix such as MEMORY_ONLY_SER enable serialization.

Tip

If your RDD is too large to persist in-memory, first try to serialize it with the MEMORY_ONLY_SER option. This will keep the RDD fast to access, but will decrease the memory needed to store it.

replication

Replication is an integer that controls the number of copies of the persisted data to be stored in the cluster.

By default this is set to 1; however, serialization options that end in _2 such as DISK_ONLY_2 replicate each partition across two nodes. Use this option to ensure faster fault tolerance. However, be aware that persistence with replication incurs double the space and speed costs of persistence without replication. Replication is usually only necessary in an instance of a noisy cluster or bad connection where failures are unusually likely. It might also be useful if you do not have time to recompute in case of failure, such as when serving a live web application.

Note

The RDD operation cache() is equivalent to the persist operation with no storage level argument, i.e., persist(). Both cache() and persist() persist the RDD with the default storage-level MEMORY_ONLY, which is equivalent to StorageLevel(false, true, false, true), which stores RDDs in-memory as deserialized Java objects, does not write to disk as partitions get evicted, and doesn’t replicate partitions.

Checkpointing

Checkpointing writes the RDD to an external storage system such as HDFS or S3, and—in contrast to persisting—forgets the RDD’s lineage. Since checkpointing requires writing the RDD outside of Spark, checkpointed information survives beyond the duration of a single Spark application and forces evaluation of an RDD. Checkpointing takes up more space in external storage and may be slower than persisting since it requires potentially costly write operations. However, it does not use any Spark memory and will not incur recomputation if a Spark worker fails.

Figure 5-2 illustrates the difference between in-memory persistence and checkpointing and RDD. Persisting stores the RDD’s partitions in-memory or on disk in the caching layer of each executor. Checkpointing writes each partition to some external system.

Caching versus Checkpointing
Figure 5-2. Caching versus checkpointing

It is best to use checkpointing when the cost of failure and recomputation is of more concern than additional space in external storage. Broadly speaking, we advise persisting when jobs are slow and checkpointing when they are failing. If a Spark job is failing due to out-of-memory errors, checkpointing will reduce the cost and likelihood of failure without using up memory on the executors. If your jobs are failing due to network errors or preemption on a noisy cluster, checkpointing can reduce the likelihood of failure by breaking up a long-running job into smaller segments. To call checkpoint, call setCheckpointDir(directory: String) from the SparkContext object and pass in a path to a location on HDFS to write the intermediate results. Then, in the Spark job, call .checkpoint() from the RDD.

Checkpointing example

Example 5-24 makes use of custom storage level and checkpointing options. The function is used in the Goldilocks example, which we describe in detail in “Goldilocks Version 4: Reduce to Distinct on Each Partition”, that makes use of custom storage level and checkpointing options. In this case we are doing several very expensive transformations: first a sort and then two very substantial map partitions routines. When running on a noisy cluster, we found it advantageous to checkpoint this function after the sort. The value of the directory parameter is the checkpoint directory. The sorted value is a sorted RDD or key/value pairs.

Example 5-24. Checkpoint example
  def findQuantilesWithCustomStorage(valPairs: RDD[((Double, Int), Long)],
    colIndexList: List[Int],
    targetRanks: List[Long],
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
    checkPoint : Boolean, directory : String = ""): Map[Int, Iterable[Double]] = {

    val n = colIndexList.last + 1
    val sorted  = valPairs.sortByKey()
    if (storageLevel != StorageLevel.NONE) {
      sorted.persist(storageLevel)
    }

    if (checkPoint) {
      sorted.sparkContext.setCheckpointDir(directory)
      sorted.checkpoint()
    }

    val partitionColumnsFreq = getColumnsFreqPerPartition(sorted, n)
    val ranksLocations  = getRanksLocationsWithinEachPart(
      targetRanks, partitionColumnsFreq, n)
    val targetRanksValues = findTargetRanksIteratively(sorted, ranksLocations)
    targetRanksValues.groupByKey().collectAsMap()
  }
Warning

Spark includes a Local Checkpointing option that truncates the RDD’s lineage graph but doesn’t persist to stable storage. This is not suitable for clusters that may experience failures, preemption, or dynamic scale-downs during the time the RDD may be referenced.

Alluxio (nee Tachyon)

Tachyon is a distributed, in-memory storage system that is developed separately from Spark. It sits above a storage system, such as S3 or HDFS, and can be used on its own or with an external computational framework such as Spark or MapReduce. Like Spark, Tachyon can be used in a standalone cluster mode, or with Mesos or YARN. Read more about Tachyon’s architecture and how to integrate it with Spark in the Tachyon documentation.

Tachyon can be used as an input or output source for Spark applications (data stored with Tachyon can be used to create RDDs) or for off_heap persistence during a Spark application. Using Tachyon for persistence has several advantages. First, it reduces garbage collection overhead, since data is not stored as Java objects. Second, it allows multiple executors to share the same external memory pool in Tachyon. Third, since the data is stored in memory outside of Spark, it is not lost if individual executors crash. It can be particularly useful if you want to reuse an RDD but are running out of memory or seeing garbage collection errors. It is also the best way to reuse a very large RDD between multiple applications.

Note

Tachyon’s developer and user communities are very strong in China, so part of its documentation may be stronger in Mandarin than in English.

LRU Caching

RDDs that are stored in memory and/or on disk in Spark are not automatically un-persisted when they are no longer going to be used downstream. Instead, RDDs stay in memory for the duration of a Spark application, until the driver program calls the function unpersist, or memory/storage pressure causes their eviction. Spark uses Least Recently Used or LRU caching, to determine which partitions to evict if the executors begin to run out of memory.

LRU caching dictates that the data structure that was least recently accessed will be evicted. However, because of lazy evaluation it may be a bit tricky to predict which partitions will be evicted first. Generally, Spark evicts the oldest partitions; those that were created or used in the earliest Spark job or in the earliest stage within a given job. (See “Dividing the Space Within One Executor” for a more detailed explanation of memory management and partitions.) LRU caching behaves differently for different persistence options. For memory-only persistence operations configured with LRU caching, Spark will recompute the evicted partition each time it is needed. For memory and disk options, LRU caching will write the evicted partition to disk. If you want to take a persisted RDD out of memory to free up space, use unpersist.

Shuffle files

Regardless of a persist or checkpoint call, Spark does write some data to disk during a shuffle. These files are called “shuffle files” and they usually contain all of the records in each input partition sorted by mapper. Usually shuffle files remain in the local directory on the workers for the duration of an application. Thus if the driver program reuses an RDD that has already been shuffled, Spark may be able to avoid recomputing that RDD up to the point of shuffle by using the shuffle files on the mapper.

Unlike the other caches, we can’t determine if a given RDD still has its shuffle files present; e.g., there is no equivalent of the isCheckPointed command, which returns true if that RDD has been checkpointed. In general, though, shuffle files aren’t explicitly cleaned up until an RDD goes out of scope. However the web UI can be helpful in determining if stages are being skipped this way, as shown in Figure 5-3.

Skipped stage from reading shuffle files
Figure 5-3. Skipped stage from reading shuffle files

The performance of reusing shuffle files is similar to the performance of an RDD that is cached at the level of disk only.

Warning

Shuffle files can be large, and Spark has no explicit cache management for them. Keeping references to RDDs depending on shuffled output can lead to out-of-disk errors if the RDDs are not garbage collected on the driver.

Out-of-disk-space errors can be unexpected, but in clusters with small amounts of disk space they are surprisingly common. Disk space errors can be caused by long-running shell environments in which RDDs created at the top scope are never garbage collected. Spark writes the output of its shuffle operations to files on the disk of the workers in the Spark local dir. These files are only cleaned up when an RDD is garbage collected, which if the amount of memory assigned to the driver program is large, can occur infrequently. One solution is to explicitly trigger garbage collection (assuming the RDDs have gone out of scope)—if the DAG is getting too long, checkpointing can help make the RDDs available for garbage collection.

Noisy Cluster Considerations

Noisy clusters, or those with a high volume of unpredictable traffic, pose a fundamental challenge to Spark’s evaluation. By default, Spark doesn’t save most intermediate results (besides in a shuffle step). Thus, in the case of preemptions, Spark will have to recompute the calculation in the job up to the point of failure. In a noisy cluster, where long-running jobs are often interrupted, this poses a huge challenge. Checkpointing can be especially helpful to get jobs to run at all. Checkpointing breaks an RDD’s lineage, therefore reducing the cost to recompute downstream transformations. Checkpointing also persists to external storage, so that unexpected failures do not lead to data loss. If failures are common but not fatal, it may be worth configuring your job to persist to multiple machines using a storage option like MEMORY_AND_DISK_2, which replicates data on two machines. That way, failures on one node will not require a recompute. This can be especially important with wide transformations, which are very expensive.

By default, Spark uses a first in, first out (FIFO) paradigm to queue jobs within a system. This means that the first job submitted will run in its entirety, getting priority on all the available resources. However, if a job doesn’t need the whole cluster, the next job may start. FIFO scheduling can be useful to ensure that space-intensive jobs are able to use the resources that they need. However, if you launch a job a few seconds behind a many-hour process, the FIFO strategy can be frustrating. Spark offers a fair scheduler, modeled after the Hadoop fair scheduler, to allow high-traffic clusters to share resources more evenly. The fair scheduler allocates the tasks from different jobs to the executors in a “round-robin fashion” (i.e., parsing out a few tasks to the executors from each job). With the fair scheduler, a short, small job can be launched before an earlier long-running job is completed.

The fair scheduler also supports putting jobs into pools and allocating different priority (weight) to those pools. Jobs within a pool are allocated the same number of resources, and the pools are allocated resources according to their weight. Using pools can be a good way to ensure that high-priority jobs or very expensive jobs are completed. The fair scheduler also ensures that users are allocated resources evenly regardless of how many jobs they submit. You can read more about using and configuring a fair scheduler in the the Spark job scheduling documentation.

Interaction with Accumulators

The interaction of caching and accumulators can make reasoning about accumulators more difficult. As we mentioned, if part of an RDD has to be recomputed, Spark may continue to add values to the accumulator as it recomputes; causing the values in the recomputed part to be double counted. Furthermore, not all computations will always compute the entirety of a partition. Surprisingly, caching does not prevent either double counting or problems that arise from partially evaluated partitions. Cached partitions may be evicted, so double counting may still arise if the machine with the cached data fails or if the partition is evicted to make space for a more recently cached partition. Unfortunately, caching with accumulators may cause a job that appears to compute the correct value on small data to later compute the incorrect value on large data.

Conclusion

Now that you have explored how to get the most out of your standard RDD transformations, as well as joins, it’s time to explore the concerns associated with the most important and complicated subset of RDD transformations, key/value pair operations. Not all of the techniques you will have learned need to be applied in every Spark program, and some of the takeaways from this chapter are more about when certain tools are not a good fit (see “Accumulators”). Many of the same techniques and considerations for standard RDD transformations apply when working with key/value data: if your transformation doesn’t depend on the key, the techniques from this chapter may even be more relevant.

1 If A and B are sets, (A - B) ∪ (B ∩ A)= A in all cases. This is not true in Spark. If rddA or rddB have duplicate keys or if rddA and rddB have overlapping keys, then (A - B) ∪ (B ∩ A) is a subset of A.

2 Originally planned for 2.0.

3 Some notable exceptions are inside of certain ML algorithms, which if passed in an unpersisted RDD will automatically persist and unpersist the RDD.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset