Most commonly, Spark programs are structured on RDDs: they involve reading data from stable storage into the RDD format, performing a number of computations and data transformations on the RDD, and writing the result RDD to stable storage or collecting to the driver. Thus, most of the power of Spark comes from its transformations: operations that are defined on RDDs and return RDDs.
At present, Spark contains specialized functionality for about a half-dozen types of RDDs, each with its own properties and scores of different transformation functions. In this section, we hope to give you the tools to think about how your RDD transformation, or series of transformations, will be evaluated. In particular: what kinds of RDDs these transformations return, whether persisting or checkpointing RDDs between transformations will make your computation more efficient, and how a given series of transformations could be executed in the most performant way possible.
The transformations in this section are those associated with the RDD object used in Spark Core (and MLlib). RDDs are also used inside of DStreams with Spark Streaming, but they have different functionality and performance properties. Likewise, most of the functions discussed in this chapter are not yet supported in DataFrames
. Since Spark SQL has a different optimizer, not all of the conceptual lessons of this chapter will carry over to the Spark SQL world.
As Spark moves forward, more RDD transformations will become available on Dataset
s, which can be used in Spark SQL, and which are discussed in “Datasets”.
In Chapter 2, we introduced one important distinction between types of transformations: those with wide dependencies and those with narrow dependencies. This distinction is important because it has strong implications for how transformations are evaluated and, consequently, for their performance. In this subsection, we will more precisely define the wide and narrow transformations, demonstrate how to determine whether a transformation is wide or narrow, and explain why this distinction matters for evaluation and performance.
Recall that Spark is lazily evaluated, meaning that a transformation is not executed until an action that depends on that transformation is called. This, as we discussed in detail in “Lazy Evaluation”, has important consequences for fault tolerance, performance, and debugging. If the information in this tip is confusing, please refer back to Chapter 2, which will give you the basic understanding of the Spark execution engine needed for this chapter.
To summarize what we covered in Chapter 2: wide transformations are those that require a shuffle, while narrow transformations are those that do not. In “Wide Versus Narrow Dependencies” we explained that in narrow transformations, the child partitions (the partitions in the resulting RDD) depend on a known subset of the parent partitions. While this definition is correct, it is less precise than the formal definition of narrow transformations.
The 2012 paper that first presented the evaluation semantics for Spark defines transformations with narrow dependencies as those in which “each partition of the parent RDD is used by at most one partition of the child RDD.” The creators define transformations with wide dependencies as transformations in which “multiple child partitions may depend on [each partition in the parent].” This definition states the analogue of what we explained in Chapter 2, in which we defined narrow and wide dependencies in relation to the child RDD’s dependencies. In contrast, the creators’ definition defined narrow and wide dependencies in terms of the dependencies on the parent RDD, rather than those on the child RDD.
We think the definition presented in Chapter 2 is easier to conceptualize since one usually designs a program by thinking from the input data (parent RDD) to the output data (child RDD). However, the Spark evaluation engine (the “DAG”) builds an execution plan in reverse: from the output (the last action) to the input RDD. Thus, the Spark creators’ definition mirrors the way that Spark is evaluated and consequently it is more precise in two important ways. First, the founders’ definition rules out the case of one parent partition having multiple children in a narrow dependency. It explains why coalesce is only a narrow transformation when it is reducing rather than increasing the number of partitions. Second, the founders’ definition clarifies why the number of tasks used to complete a computation corresponds to each output partition rather than each input partition—when RDDs are evaluated; the tasks needed to compute a transformation are computed on the child partitions.
Figure 5-1 shows dependencies between parent and child partitions for narrow and wide transformations for the Spark program in Example 5-1. Assume RDD1
is an RDD of integers.
//Narrow dependency. Map the rdd to tuples of (x, 1)
val
rdd2
=
rdd1
.
map
(
x
=>
(
x
,
1
))
//wide dependency groupByKey
val
rdd3
=
rdd2
.
groupByKey
()
We use the same structure as the diagrams presented in Figures 2-2 and 2-3. Arrows represent partition dependencies. Each child partition has arrows pointing to the parent partitions upon which it depends; if an arrow points from partition y to partition x, that means that x depends on y. Blue arrows represent narrow dependencies and red arrows represent wide dependencies.
We assume the RDD has four partitions. Unlike the diagrams presented in Chapter 2, here we will show how actual records in a very small RDD might be distributed amongst the partitions. In this case we show how RDD1
, RDD2
, and RDD3
would be partitioned if RDD1
was an RDD of the integers 3, 3, 9, 2, 8, 5, 6, 7
.
As you can see, to compute the map
step, each child partition depends on just one parent, since the data doesn’t need to be moved between partitions for the operation to be computed. However, in the groupByKey
step, Spark needs to move an arbitrary number of the records so that those with the same key are in the same partition in order to combine records corresponding to a single key into one iterator (recall that iterator is a local, rather than a distributed collection). Thus, the child partitions depend on many partitions in the parent RDD.
This diagram is intended to show how partitions, an abstract concept used in Spark evaluation, depend on each other, rather than any physical data movement across machines. Each line of squares in the diagram represents the same executors at different points in time. The arrows denote dependencies between partitions. In fact repartitioning data does not necessarily require data movement across machines, since partitions may reside on the same executor. When changing the partition of a record does require data movement between executors, the records have to be passed through the driver rather than transferred directly between the executors.
In “Wide Versus Narrow Dependencies” we asserted that transformations with narrow dependencies are faster to execute partly because narrow transformations can be combined and executed in one pass of the data. In this section we hope to explain why this is from an evaluation perspective.
Narrow dependencies do not require data to be moved across partitions. Consequently narrow transformations don’t require communication with the driver node, and an arbitrary number of narrow transformations can be executed on any subset of the records (any partition) given one set of instructions from the driver. In Spark terminology, we say that each series of narrow transformations can be computed in the same “stage” of the query execution plan.
In contrast, as we stated in “The Anatomy of a Spark Job”, a shuffle associated with a wide dependency marks a new stage in the RDD’s evaluation. Because tasks must be computed on a single partition and the data needed to compute each partition of a wide dependency may be spread across machines, transformations with wide dependencies may require data to be moved across partitions. Thus, the downstream computations cannot be computed before the shuffle finishes.
For example, it should be intuitive that sorting cannot be accomplished with narrow transformations because sorting requires an order to be defined on all of the records—not just within each partition. Indeed, the sortByKey
function has wide dependencies. It requires the data to be partitioned, so all the keys within a certain range live on the same partition. That way, sorting the data on each partition leads to a sorted result. Any narrow transformations following the sort cannot be done until after the shuffle completes because the records on each partition may change.
Stage boundaries have important performance consequences. Except in the case of multiple RDD operations like join
, the stages associated with one RDD must be executed in sequence (see Chapter 4). Thus, not only are shuffles expensive since they require data movement and potential disk I/O (for the shuffle files), they also limit parallelization.
The cost of failure for a partition with wide dependencies is much higher than for one with narrow dependencies, since it requires more partitions to be recomputed. If one partition in the parent of a mappedRDD
(the resulting RDD type of a map
operations) fails, for example, only one of its children must be recomputed, and the tasks needed to recompute that child partition can be distributed across the executors to make this recomputation faster. In contrast, if the parent of the sorted RDD loses a partition, it is possible (in the worst case) that all the child partitions will need to be recomputed. For this reason, the cost of recomputing a partition in the case of failure for a partition with wide dependencies is much higher than for a partition with narrow dependencies.
Chaining together transformations with wide dependencies only increases the risk of a very expensive recomputation particularly if any of the wide transformations have a high probability of causing memory errors. In some instances, the cost of recomputation may be high enough that it is worth checkpointing
an RDD, so the intermediate results are saved. We will discuss checkpointing in detail in “Reusing RDDs”.
The coalesce
operation is used to change the number of partitions in an RDD. As shown by the diagram in Figure 2-2 in Chapter 2, when coalesce
reduces the number of output partitions, each parent partition is used in exactly one child partition since the child partitions are the union of several parents. Thus, according to our definition of narrow dependencies, coalesce
is a narrow transformation even though it changes the number of partitions in the RDD. Since tasks are executed on the child partition, the number of tasks executed in a stage that includes a coalesce
operation is equivalent to the number of partitions in the result RDD of the coalesce
transformation.
Using coalesce
, the number of partitions can decrease in one stage without causing a shuffle. However, coalesce
causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce
, which may be undesirable in some cases. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce
to true
or by using the repartition
function instead.
However, when coalesce
increases the number of partitions, each parent partition necessarily depends on several child partitions. Thus, according the more precise definition of wide dependencies presented in “Narrow Versus Wide Transformations”, using coalesce
to increase the number of partitions is a wide transformation. The coalesce
function prioritizes evenly distributing the data across the child partitions. Consequently, the location of records in the output cannot be determined at design time because it depends on how many records are stored on each input partition, and the number of records on each partition of course cannot be determined without reading the data and evaluating the upstream transformations. Ergo, increasing the number of partitions with either a coalesce
or repartition
call requires a shuffle.
RDDs are an abstracted concept in two ways: they can be of almost any arbitrary type of record (e.g., String
, Row
, Tuple
), and they can also be members of one of several implementations of the RDD
interface with varying properties. Both distinctions are important for performance and evaluation. The first is important, because some transformations can only be applied to RDDs with certain record types. The second is important because each transformation returns one of the several implementations of the RDD interface, and therefore the same transformation called on two different RDD implementations (such as a mappedRDD
versus a GoGroupedRDD
) may be evaluated differently. In particular, some RDD implementations retain information about the ordering or locality of the records in the RDD from previous transformations. Understanding the data locality and partitioning information associated with the resulting RDD of a transformation can help avoid unnecessary shuffles. We will save a more detailed discussion of this for “Preserving Partitioning Information Across Transformations” since it is most relevant to Pair RDDs. In this section we will discuss preserving record type information because it can be important for performance and surprisingly difficult as Spark programs get complicated.
The RDD is a collection type which, much like collection types in Scala, Java, and most other strongly typed languages, is instantiated with a type parameter indicating the type of the members in the collection.
In Scala, the syntax for a type parameter is brackets; e.g., List[String]
, which indicates a sequence of String
objects.
It is equivalent to the Java syntax < >
(e.g., List<String>
).
RDDs are similarly typed. For example, if you use sc.textfile
to read in your RDD, you will end up with an RDD of String
type (denoted RDD[String]
in Scala and RDD<String>
in Java).
The RDD’s record type information is important because many transformations are only defined on RDDs that are of a particular type, so trying to use methods on an RDD of generic type will return compile-time or runtime errors. For example, if an RDD of tuples has lost its type information and is interpreted by the compiler to be of type RDD[Any]
or even RDD[(Any, Int)]
, calling sortByKey
will not compile. The compilation error occurs because sortByKey
can only be called on RDDs of key/value pairs where the keys have some implicit ordering. Similarly, numeric functions such as max
, min
, and sum
can only be called on RDDs of Long
, Int
, or Double
.
Record type information is one of many places in the Spark API where implicit conversions are likely to cause difficulties. If you are writing subroutines to be used in RDD transformation, it is often best to specify the helper function’s input and return types concretely and avoid writing them on a generic type.
One instance that often leads to problems losing type information is when working with DataFrames
as RDDs. DataFrames
can be implicitly converted to RDDs of Rows
. However, since the Spark SQL Row
object is not strongly typed (it can be created from sequences of any value type), the Scala compiler cannot “remember” the type of value used to create the row. Indexing a row will return a value of type Any
, which must be cast to a more specific type, such as a String
or an Int
, to perform most calculations. The type information for the rows is stored in the schema. However, converting to an RDD throws away a DataFrame
’s schema information, so it is important to bind the DataFrame
schema to a variable. One of the advantages of the Dataset API is that it is strongly typed, so the values in each row will retain their type information even after conversion to an RDD.
“Garbage collection” is the process of freeing up the memory allocated for an object once that object is no longer needed. Since Spark runs in the JVM, which has automatic memory management and large data structures, garbage collection can quickly become an expensive part of our Spark job. Garbage collection or “GC” errors are a common cause of failure. Even if garbage collection overhead doesn’t prohibit a job from running, garbage collection creates additional serialization time, which can significantly slow it down. We can minimize the GC cost by reducing the number of objects and the size of those objects. We can reduce the size and number of our objects by reusing existing objects and by using data structures (such as primitive types) that take up less space in memory.
Some RDD transformations allow us to modify the parameters in the lambda expression rather than returning a new object. For example, in the sequence function of the aggregation function for aggregateByKey
and aggregate
, we can modify the original accumulator argument and define the combine function in such a way that the combination is created by modifying the first of the two accumulators. A common and effective paradigm for complicated aggregations is to define a Scala class with sequence and combine operations that return the existing object using the this.type
annotations.
For example, suppose that we wanted to do some custom aggregation that is not already defined in Spark. Let’s say that we have an RDD of key/value pairs where the keys are the panda’s instructors and the values are the pupils’ panda report cards. For each instructor we want to know the length of the longest word used, the average number of words used per report card, and the number of instances of the word “happy.”
One valid, easy-to-read approach would be to use the aggregateByKey
function, which takes three arguments: a zero value that represents an empty accumulator, a sequence function that takes the accumulator and a value and adds the value to the accumulator, and a combine operator that defines how the accumulators should be combined.
In this instance we could define our accumulator to be an object with four fields: the total count of all the words, the total number of reports, the longest word seen so far, and the total number of mentions of the word “happy.”
For clarity we can define this as its own object with methods for sequence and combine. We have named this object MetricsCalculator
, and it might be coded as shown in Example 5-2.
class
MetricsCalculator
(
val
totalWords
:
Int
,
val
longestWord
:
Int
,
val
happyMentions
:
Int
,
val
numberReportCards
:
Int
)
extends
Serializable
{
def
sequenceOp
(
reportCardContent
:
String
)
:
MetricsCalculator
=
{
val
words
=
reportCardContent
.
split
(
" "
)
val
tW
=
words
.
length
val
lW
=
words
.
map
(
w
=>
w
.
length
).
max
val
hM
=
words
.
count
(
w
=>
w
.
toLowerCase
.
equals
(
"happy"
))
new
MetricsCalculator
(
tW
+
totalWords
,
Math
.
max
(
longestWord
,
lW
),
hM
+
happyMentions
,
numberReportCards
+
1
)
}
def
compOp
(
other
:
MetricsCalculator
)
:
MetricsCalculator
=
{
new
MetricsCalculator
(
this
.
totalWords
+
other
.
totalWords
,
Math
.
max
(
this
.
longestWord
,
other
.
longestWord
),
this
.
happyMentions
+
other
.
happyMentions
,
this
.
numberReportCards
+
other
.
numberReportCards
)
}
def
toReportCardMetrics
=
ReportCardMetrics
(
longestWord
,
happyMentions
,
totalWords
.
toDouble
/
numberReportCards
)
}
We could then use this object in the arguments to our aggregation function, as shown in Example 5-4, in a routine that maps the RDD of instructors, and report text to a case class with the three metrics we care about in Example 5-3.
case
class
ReportCardMetrics
(
longestWord
:
Int
,
happyMentions
:
Int
,
averageWords
:
Double
)
/**
* Given an RDD of (PandaInstructor, ReportCardText) aggregate by instructor
* to an RDD of distinct keys of (PandaInstructor, ReportCardStatistics)
* where ReportCardMetrics is a case class with
*
* longestWord -> The longest word in all of the reports written by this instructor
* happyMentions -> The number of times this intructor mentioned the word happy
* averageWords -> The average number of words per report card for this instructor
*/
def
calculateReportCardStatistics
(
rdd
:
RDD
[(
String
,String
)]
)
:
RDD
[(
String
,ReportCardMetrics
)]
={
rdd
.
aggregateByKey
(
new
MetricsCalculator
(
totalWords
=
0
,
longestWord
=
0
,
happyMentions
=
0
,
numberReportCards
=
0
))(
seqOp
=
((
reportCardMetrics
,
reportCardText
)
=>
reportCardMetrics
.
sequenceOp
(
reportCardText
)),
combOp
=
(
x
,
y
)
=>
x
.
compOp
(
y
))
.
mapValues
(
_
.
toReportCardMetrics
)
}
This method is superior to using a two map
and one reduceByKey
method. The aggregate function combines each partition locally, then does a shuffle to perform the cross-partition reduction. However, it has the disadvantage of creating a new instance of our custom object for each record in the dataset and for each combine step.
A very simple way to reduce the cost of object creation would be to modify our MetricsCalculator
to use Scala’s this.type
design paradigm so that the sequence operation modifies the original accumulator and the combine operation modifies the first accumulator rather than returning a new one, as shown in Example 5-5.
class
MetricsCalculatorReuseObjects
(
var
totalWords
:
Int
,
var
longestWord
:
Int
,
var
happyMentions
:
Int
,
var
numberReportCards
:
Int
)
extends
Serializable
{
def
sequenceOp
(
reportCardContent
:
String
)
:
this.
type
=
{
val
words
=
reportCardContent
.
split
(
" "
)
totalWords
+=
words
.
length
longestWord
=
Math
.
max
(
longestWord
,
words
.
map
(
w
=>
w
.
length
).
max
)
happyMentions
+=
words
.
count
(
w
=>
w
.
toLowerCase
.
equals
(
"happy"
))
numberReportCards
+=
1
this
}
def
compOp
(
other
:
MetricsCalculatorReuseObjects
)
:
this.
type
=
{
totalWords
+=
other
.
totalWords
longestWord
=
Math
.
max
(
this
.
longestWord
,
other
.
longestWord
)
happyMentions
+=
other
.
happyMentions
numberReportCards
+=
other
.
numberReportCards
this
}
def
toReportCardMetrics
=
ReportCardMetrics
(
longestWord
,
happyMentions
,
totalWords
.
toDouble
/
numberReportCards
)
}
Our aggregation routine will remain the same.
It should be obvious that the Scala code within the sequence operator is slower than it needs to be. Rather than performing three different functional calls on the words
array we ought to go through the string as a string buffer, counting the words, keeping track of the longest word, and counting the occurrence of the word “happy” (or at least use a while loop to parse the words
array rather than three recursive calls). We have left this solution since we think it is easier to read and the primary intention of the example is to show how to optimize the aggregateByKey
Spark routine.
Reduce (which calls aggregate) and the fold operations (foldLeft
, fold
, foldRight
) can also benefit from object reuse. However, these aggregation functions are unique. It is best to avoid mutable data structures in Spark code (and Scala code in general) because they can lead to serialization errors and may have inaccurate results. For many other RDD functions, particularly narrow transformations, modifying the first value of the argument is not safe because the transformations may be chained together with lazy evaluation and may be evaluated multiple times. For example, if you have an RDD of mutable objects, modifying the arrays with a map function may lead to inaccurate results since the objects may be reused more times than you expect—especially if the RDD is recomputed.
Spark can be a memory hog. An important way to optimize Spark jobs for both time and space is to stick to primitive types rather than custom classes. Although it may make code less readable, using arrays rather than case classes or tuples can reduce GC overhead. Scala arrays, which are exactly Java arrays under the hood, are the most memory-efficient of the Scala collection types. Scala tuples are objects, so in some instances it might be better to use a two- or three-element array rather than a tuple for expensive operations. The Scala collection types in general incur a higher GC overhead than arrays.
Notice that our ReportCardMetrics
object is just a wrapper for a few numeric values. Although it is less readable and less object-oriented, it is more space-efficient to use a four-element array of integers. We can maintain the same readable code paradigm by using a Scala object
instead of a class
and defining the sequence, and combine operations as functions on strings and arrays as shown in Example 5-6.
object
MetricsCalculator_Arrays
extends
Serializable
{
val
totalWordIndex
=
0
val
longestWordIndex
=
1
val
happyMentionsIndex
=
2
val
numberReportCardsIndex
=
3
def
sequenceOp
(
reportCardMetrics
:
Array
[
Int
],
reportCardContent
:
String
)
:
Array
[
Int
]
=
{
val
words
=
reportCardContent
.
split
(
" "
)
//modify each of the elements in the array
reportCardMetrics
(
totalWordIndex
)
+=
words
.
length
reportCardMetrics
(
longestWordIndex
)
=
Math
.
max
(
reportCardMetrics
(
longestWordIndex
),
words
.
map
(
w
=>
w
.
length
).
max
)
reportCardMetrics
(
happyMentionsIndex
)
+=
words
.
count
(
w
=>
w
.
toLowerCase
.
equals
(
"happy"
))
reportCardMetrics
(
numberReportCardsIndex
)
+=
1
reportCardMetrics
}
def
compOp
(
x
:
Array
[
Int
],
y
:
Array
[
Int
])
:
Array
[
Int
]
=
{
//combine the first and second arrays by modifying the elements
// in the first array
x
(
totalWordIndex
)
+=
y
(
totalWordIndex
)
x
(
longestWordIndex
)
=
Math
.
max
(
x
(
longestWordIndex
),
y
(
longestWordIndex
))
x
(
happyMentionsIndex
)
+=
y
(
happyMentionsIndex
)
x
(
numberReportCardsIndex
)
+=
y
(
numberReportCardsIndex
)
x
}
def
toReportCardMetrics
(
ar
:
Array
[
Int
])
:
ReportCardMetrics
=
ReportCardMetrics
(
ar
(
longestWordIndex
),
ar
(
happyMentionsIndex
),
ar
(
totalWordIndex
)/
ar
(
numberReportCardsIndex
)
)
}
We would then need to modify our aggregation code slightly. We are not using the same custom aggregation object, and the zero value has changed. This is shown in Example 5-7.
def
calculateReportCardStatisticsWithArrays
(
rdd
:
RDD
[(
String
,String
)]
)
:
RDD
[(
String
,ReportCardMetrics
)]
=
{
rdd
.
aggregateByKey
(
//the zero value is a four element array of zeros
Array
.
fill
[
Int
](
4
)(
0
)
)(
//seqOp adds the relevant values to the array
seqOp
=
(
reportCardMetrics
,
reportCardText
)
=>
MetricsCalculator_Arrays
.
sequenceOp
(
reportCardMetrics
,
reportCardText
),
//combo defines how the arrays should be combined
combOp
=
(
x
,
y
)
=>
MetricsCalculator_Arrays
.
compOp
(
x
,
y
))
.
mapValues
(
MetricsCalculator_Arrays
.
toReportCardMetrics
)
}
Within a function, it is often beneficial to avoid intermediate object creation. It is important to remember that converting between types (such as between different flavors of Scala collections) creates intermediate objects. This is yet another place in which implicit conversions may have unfortunate performance implications.
For example, suppose that observing our note in the previous section, you wanted to speed up the sequence function of the MetricsCalculator_ReuseObjects
object.
Then, you realized that your coworker had written a general-purpose utility that finds the instances of the word “happy” and the longest word in a collection of strings (shown in Example 5-8).
def
findWordMetrics
[
T
<:
Seq
[
String
]](
collection
:
T
)
:
(
Int
,
Int
)={
val
iterator
=
collection
.
toIterator
var
mentionsOfHappy
=
0
var
longestWordSoFar
=
0
while
(
iterator
.
hasNext
){
val
n
=
iterator
.
next
()
if
(
n
.
toLowerCase
==
"happy"
){
mentionsOfHappy
+=
1
}
val
length
=
n
.
length
if
(
length
>
longestWordSoFar
)
{
longestWordSoFar
=
length
}
}
(
longestWordSoFar
,
mentionsOfHappy
)
}
Your coworker helpfully defined her function on any type that extends a Scala Traversable
index. Thus, you won’t need to convert the array of words at all and can happily write the code shown in Example 5-9.
val
totalWordIndex
=
0
val
longestWordIndex
=
1
val
happyMentionsIndex
=
2
val
numberReportCardsIndex
=
3
def
fasterSeqOp
(
reportCardMetrics
:
Array
[
Int
],
content
:
String
)
:
Array
[
Int
]
=
{
val
words
:
Seq
[
String
]
=
content
.
split
(
" "
)
val
(
longestWord
,
happyMentions
)
=
CollectionRoutines
.
findWordMetrics
(
words
)
reportCardMetrics
(
totalWordIndex
)
+=
words
.
length
reportCardMetrics
(
longestWordIndex
)
=
longestWord
reportCardMetrics
(
happyMentionsIndex
)
+=
happyMentions
reportCardMetrics
(
numberReportCardsIndex
)
+=
1
reportCardMetrics
}
Unfortunately, in terms of object creation, this new implementation is actually worse than the previous one. It creates two extra objects containing the collection with the words each time a sequence operation is called! First when you call the findWordMetrics
routine, since the input array has to be implicitly converted to a Traversable
object (creating a new object of the same size), and again when your coworker’s code casts the Traversable
object to an Iterator
.
Modifying a value passed into your transformation is not always safe, so double-check the documentation for the function you are using.
Beyond reducing the objects that are directly allocated, Scala’s implicit conversions can sometimes cause additional allocations in the process of converting.
The RDD mapPartitions
function takes as its argument a function from an iterator
of records (representing the records on one partition) to another iterator
of records (representing the output partition).
The mapPartitions
transformation is one of the most powerful in Spark since it lets the user define an arbitrary routine on one partition of data.
The mapPartitions
transformation can be used for very simple data transformations like string parsing, but it can also be used for complex, expensive data-processing work to solve problems such as secondary sort or highly custom aggregations.
Many of Spark’s other transformations, like filter
, map
, and flatMap
, can be built using mapPartitions
.
Optimizing the mapPartitions
routines is an important part of writing complicated and performant Spark code, as we will see in Chapter 6. To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of mapPartitions
in such a way that your functions do not force loading the entire partition in-memory (e.g., implicitly converting to a list).
Iterators have many methods we can use to write functional-style transformations. You may also construct your own custom iterator extending the Iterator
interface.
When a transformation directly takes and returns an iterator without forcing it through another collection, we call it an iterator-to-iterator transformation.
A Scala iterator
object is not actually a collection, but a function that defines a process of accessing the elements in a collection one-by-one. Not only are iterators immutable, but the same element in an iterator can only be accessed one time. In other words, iterators can only be traversed once, and they extend the Scala interface TraversableOnce
.
Iterators have some of the same methods defined on them as other immutable Scala collections, such as mappings (map
and flatMap
), additions (++
), folds (foldLeft
, reduceRight
, reduce
), element conditions (forall
and exists
), and traversals (next
and foreach
). In some instances, these methods behave differently than other Scala collections. Since the iterator can only be traversed once, any of the iterator methods that require looking at all the elements in the iterator will leave the original iterator empty.
Java has its own implementation of iterators, java.util.Iterator
, which have the same benefits as Scala iterators for Spark’s evaluation.
Beware of your function calls. It is easy to accidentally consume an iterator by calling an object that traverses through the iterator such as size
, or to trigger an implicit conversion. Iterators can be converted to any other Scala collection type. However, converting them requires accessing each of the elements. Thus, after it has been converted to a new collection type, an iterator will be at its last element (empty).
In some ways it can be helpful to conceptualize iterator methods as we would RDD methods—as either transformations or actions—because like an RDD, an iterator is actually a set of evaluation instructions rather than a stored state. Some iterator methods, like next
, size
, and foreach
, traverse the iterator and evaluate it (more like an action). Others, like map
and flatMap
, return a new iterator—which is really a set of evaluation instructions—much like RDD transformations return a new RDD. However, in contrast to Spark transformations, iterator transformations are executed linearly, one element at at time, rather than in parallel. This makes iterators slower but much easier to use than if they could be executed in parallel. For example, if we needed to store some information about the records we have seen, we can do that in a filter
or a map
function on the iterator, since the map/filter routine will be applied to each element sequentially. (See Example 5-12 at the end of this section.) One-to-one functions are also not chained together in iterator operations so using three map
calls still requires looking at each element in the iterator three times.
By “iterator-to-iterator transformation” we mean using one of these iterator “transformations” to return a new iterator rather than a) converting the iterator to a different collection or b) evaluating the iterator with one of the iterator “actions” and building a new collection. To reiterate: using a while loop to traverse the elements of an iterator and build a new collection (even a new iterator) does not qualify as an iterator-to-iterator transformation. Converting an iterator to a more intuitive collection type, manipulating it, and converting back to an iterator is not an iterator-to-iterator transformation. Indeed, converting the iterator argument in mapPartitions
to a collection object eliminates all the benefits of iterator-to-iterator transformations.
The primary advantage of using iterator-to-iterator transformations in Spark routines is that their transformations allow Spark to selectively spill data to disk. Conceptually, an iterator-to-iterator transformation means defining a process for evaluating elements one at a time. Thus, Spark can apply that procedure to batches of records rather than reading an entire partition into memory or creating a collection with all of the output records in-memory and then returning it. Consequently, iterator-to-iterator transformations allow Spark to manipulate partitions that are too large to fit in memory on a single executor without out memory errors.
Furthermore, keeping the partition as an iterator allows Spark to use disk space more selectively. Rather than spilling an entire partition when it doesn’t fit in memory, the iterator-to-iterator transformation allows Spark to spill only those records that do not fit in memory, thereby saving disk I/O and the cost of recomputation. Lastly, using methods defined on iterators avoids defining intermediary data structures. Reducing the number of large intermediate data structures is a way to avoid unnecessary object creation, which can slow down garbage collection as we talked about in “Minimizing Object Creation”.
Unfortunately the Spark Streaming mapPartitions
API is one of relatively few places where the Scala API decisively outperforms its Java counterpart. Prior to Spark 1.6, mapPartitions
in Spark Streaming was defined on objects of type Java Iterable
rather than Java Iterator
and thus automatically reads the entire collection into memory. In the Spark Core, the Java API still uses Iterable
rather than iterators as the grouped result of groupByKey
, thus eliminating the possibility of using an iterator-to-iterator transformation after a groupByKey
call.
For all their advantages, iterators can be a much harder abstraction to conceptualize and use than collection types such as arrays and hash maps, with which users may be more familiar from other languages. Here we provide an example of a complicated mapPartitions
routine, which given a sorted RDD of (value, columnIndex), count)
tuples and a list of rank statistics on this partition, returns the (value, columnIndex)
pairs that represent ranks statistics, shown in Example 5-10.
This method is part of the optimal solution to the “Goldilocks problem,” which is presented in full in “Goldilocks Version 4: Reduce to Distinct on Each Partition” and introduced in “The Goldilocks Example”.
private
def
findTargetRanksIteratively
(
sortedAggregatedValueColumnPairs
:
RDD
[((
Double
,Int
)
,Long
)],
ranksLocations
:
Array
[(
Int
,List
[(
Int
,Long
)])])
:
RDD
[(
Int
,Double
)]
=
{
sortedAggregatedValueColumnPairs
.
mapPartitionsWithIndex
((
partitionIndex
:
Int
,
aggregatedValueColumnPairs
:
Iterator
[((
Double
,Int
)
,Long
)])
=>
{
val
targetsInThisPart
:
List
[(
Int
,Long
)]
=
ranksLocations
(
partitionIndex
).
_2
if
(
targetsInThisPart
.
nonEmpty
)
{
FindTargetsSubRoutine
.
asIteratorToIteratorTransformation
(
aggregatedValueColumnPairs
,
targetsInThisPart
)
}
else
{
Iterator
.
empty
}
})
}
This routine is a good example of a place where we are likely to see performance gains from an iterator-to-iterator transformation, since it is a complicated routine performed on partitions that we anticipate will be too large to fit in memory. However, it is an instance where using iterators is, from a design perspective, a non-obvious choice because we have to keep a map of running totals with the number of elements for each column we have seen so far.
A more straightforward way to design this routine would be as follows: loop through the iterator, store the running totals in a hashMap
, and build a new collection of the elements we want to keep using an array buffer—then convert the array buffer to an iterator, shown in Example 5-11.
def
withArrayBuffer
(
valueColumnPairsIter
:
Iterator
[((
Double
,Int
)
,Long
)],
targetsInThisPart
:
List
[(
Int
,Long
)]
)
:
Iterator
[(
Int
,Double
)]
=
{
val
columnsRelativeIndex
:
Predef.Map
[
Int
,List
[
Long
]]
=
targetsInThisPart
.
groupBy
(
_
.
_1
).
mapValues
(
_
.
map
(
_
.
_2
))
// The column indices of the pairs that are desired rank statistics that live in
// this partition.
val
columnsInThisPart
:
List
[
Int
]
=
targetsInThisPart
.
map
(
_
.
_1
).
distinct
// A HashMap with the running totals of each column index. As we loop through
// the iterator, we will update the hashmap as we see elements of each
// column index.
val
runningTotals
:
mutable.HashMap
[
Int
,Long
]
=
new
mutable
.
HashMap
()
runningTotals
++=
columnsInThisPart
.
map
(
columnIndex
=>
(
columnIndex
,
0L
)).
toMap
//we use an array buffer to build the resulting iterator
val
result
:
ArrayBuffer
[(
Int
,Double
)]
=
new
scala
.
collection
.
mutable
.
ArrayBuffer
()
valueColumnPairsIter
.
foreach
{
case
((
value
,
colIndex
),
count
)
=>
if
(
columnsInThisPart
contains
colIndex
)
{
val
total
=
runningTotals
(
colIndex
)
//the ranks that are contained by this element of the input iterator.
//get by filtering the
val
ranksPresent
=
columnsRelativeIndex
(
colIndex
)
.
filter
(
index
=>
(
index
<=
count
+
total
)
&&
(
index
>
total
))
ranksPresent
.
foreach
(
r
=>
result
+=
((
colIndex
,
value
)))
//update the running totals.
runningTotals
.
update
(
colIndex
,
total
+
count
)
}
}
//convert
result
.
toIterator
}
At first this looks like an okay solution since we are estimating that the number of elements we are returning is small, and because array buffers are usually a relatively performant way to build up Scala collections. However, if the input data is very large relative to the cluster size, we still see out-of-memory errors and failures in this step. A more efficient solution would be to use an iterator-to-iterator transformation.
We can convert this subroutine to an iterator-to-iterator transformation although our routine is not parallelizable (it requires keeping a list of running totals). We can do this because the subroutine we need can be completed on one element of the iterator without any information about the other elements. The final solution uses the filter
function of iterators—to eliminate any elements that are not in the final data—and a flatMap
to build the new iterator of elements in the resulting partitions, as shown in Example 5-12.
def
asIteratorToIteratorTransformation
(
valueColumnPairsIter
:
Iterator
[((
Double
,Int
)
,Long
)],
targetsInThisPart
:
List
[(
Int
,Long
)]
)
:
Iterator
[(
Int
,Double
)]
=
{
val
columnsRelativeIndex
=
targetsInThisPart
.
groupBy
(
_
.
_1
).
mapValues
(
_
.
map
(
_
.
_2
))
val
columnsInThisPart
=
targetsInThisPart
.
map
(
_
.
_1
).
distinct
val
runningTotals
:
mutable.HashMap
[
Int
,Long
]
=
new
mutable
.
HashMap
()
runningTotals
++=
columnsInThisPart
.
map
(
columnIndex
=>
(
columnIndex
,
0L
)).
toMap
//filter out the pairs that don't have a column index that is in this part
val
pairsWithRanksInThisPart
=
valueColumnPairsIter
.
filter
{
case
(((
value
,
colIndex
),
count
))
=>
columnsInThisPart
contains
colIndex
}
// map the valueColumn pairs to a list of (colIndex, value) pairs that correspond
// to one of the desired rank statistics on this partition.
pairsWithRanksInThisPart
.
flatMap
{
case
(((
value
,
colIndex
),
count
))
=>
val
total
=
runningTotals
(
colIndex
)
val
ranksPresent
:
List
[
Long
]
=
columnsRelativeIndex
(
colIndex
)
.
filter
(
index
=>
(
index
<=
count
+
total
)
&&
(
index
>
total
))
val
nextElems
:
Iterator
[(
Int
,Double
)]
=
ranksPresent
.
map
(
r
=>
(
colIndex
,
value
)).
toIterator
//update the running totals
runningTotals
.
update
(
colIndex
,
total
+
count
)
nextElems
}
}
This approach allows the function to spill to disk selectively by working with each element in the iterator one at a time. This implementation saves space by incrementally building the result rather than storing the new collection type in memory as an array buffer. It saves a penny on garbage collection by not creating the array buffer as an intermediate step.
Spark has a variety of set-like operations, some of which are expensive and some of which have different behavior than the mathematical definitions of the equivalent operations. In this section we hope to explain how to use these operations safely and effectively.
Since RDDs aren’t distinct, they mainly differ from mathematical set operations in how they handle duplicates. For example, union
merely combines its arguments, so the result of union will always have the size of both RDDs combined. intersection
and subtract
are defined similarly to their set-theoretic counterparts, but since the input RDDs (unlikely mathematical sets) can have duplicates the results may be unexpected. Subtracting will remove all of the elements in the first RDD that have a key present in the second RDD. Thus it is possible that by subtracting, the result will be smaller than the size of the first RDD minus the size of the second, breaking one of the laws of set theory.
For example, the simple unit test in Example 5-13 will pass.
val
a
=
Array
(
1
,
2
,
3
,
4
,
4
,
4
,
4
)
val
b
=
Array
(
3
,
4
)
val
rddA
=
sc
.
parallelize
(
a
)
val
rddB
=
sc
.
parallelize
(
b
)
val
rddC
=
rddA
.
subtract
(
rddB
)
assert
(
rddC
.
count
()
<
rddA
.
count
()
-
rddB
.
count
())
In Spark, intersection
co-groups the argument RDDs using their values as keys and filters out those elements that don’t appear in both. Consequently the result of RDD intersection
contains no duplicates.
Although this is the expected behavior for intersection
, using several set operations on RDDs containing duplicates can lead to unexpected behavior. The union of the two RDDs in Example 5-13
is an RDD containing two elements, 1
and 2
. Thus, as the unit test in Example 5-14 demonstrates, we cannot always “re-create” rddA
as the union of the intersection and the subtraction.1
val
a
=
Array
(
1
,
2
,
3
,
4
,
4
,
4
,
4
)
val
b
=
Array
(
3
,
4
)
val
rddA
=
sc
.
parallelize
(
a
)
val
rddB
=
sc
.
parallelize
(
b
)
val
intersection
=
rddA
.
intersection
(
rddB
)
val
subtraction
=
rddA
.
subtract
(
rddB
)
val
union
=
intersection
.
union
(
subtraction
)
assert
(!
rddA
.
collect
().
sorted
.
sameElements
(
union
.
collect
().
sorted
))
Some operations require setup work per-worker or per-partition, like creating a database connection or setting up a random number generator.
For transformations you can use mapPartitions
, do the setup work per partition in the map function, and then perform your desired transformation on the iterator for the partition. We will illustrate doing this with a pseudorandom number generator in Example 5-15.
rdd
.
mapPartitions
{
itr
=>
// Only create once RNG per partitions
val
r
=
new
Random
()
itr
.
filter
(
x
=>
r
.
nextInt
(
10
)
==
0
)
}
It is important to remember to use an iterator-to-iterator transformation to allow spilling to disk selectively, as discussed in “Iterator-to-Iterator Transformations with mapPartitions”.
Beyond using this pattern to reduce setup overhead in transformations, another common pattern is to create a connection inside of an action to save the data. If your work is writing out the data you can use the same pattern as with mapPartitions
except with foreachPartition
.
If the setup work can be serialized, a broadcast variable can distribute the object that we cover next.
If the setup work can’t be serialized, a broadcast variable with a transient lazy val
can be used as well.
See Example 5-17 in the next section.
Spark has two types of shared variables—broadcast variables and accumulators—each of which can only be written in one context (driver or worker, respectively) and read in the other. Broadcast variables can be written in the driver program and read on the executors, whereas accumulators are written onto the executors and read on the driver.
Broadcast variables give us a way to take a local value on the driver and distribute a read-only copy to each machine rather than shipping a new copy with each task. Broadcast variables might not seem especially useful, since we can just capture a local variable in our closure to transfer data from the driver to the workers; however, the savings of only sending one copy per machine versus sending one copy per task can make a huge difference, especially when the same broadcast variable is used in additional transformations. Two common examples of using broadcast variables are a) broadcasting a small table to join against and b) broadcasting a machine learning model to be able to run the predictions on our data.
Creating a broadcast variable is done by calling broadcast
on the SparkContext
.
This distributes the value to the workers and gives us back a wrapper that allows us to access the value on the workers by calling value
, as shown in Examples 5-16 and 5-17.
If a broadcast
variable is created with a variable input, the input should not be modified after the variable has been created since existing workers will not see the updates and new workers may see the new value.
val
invalid
=
HashSet
()
++
invalidPandas
val
invalidBroadcast
=
sc
.
broadcast
(
invalid
)
input
.
filter
{
panda
=>
!
invalidBroadcast
.
value
.
contains
(
panda
.
id
)}
class
LazyPrng
{
@transient
lazy
val
r
=
new
Random
()
}
def
customSampleBroadcast
[
T:
ClassTag
](
sc
:
SparkContext
,
rdd
:
RDD
[
T
])
:
RDD
[
T
]
=
{
val
bcastprng
=
sc
.
broadcast
(
new
LazyPrng
())
rdd
.
filter
(
x
=>
bcastprng
.
value
.
r
.
nextInt
(
10
)
==
0
)
}
The value for a broadcast variable must be a local, serializable value: no RDDs or other distributed data structures.
Internally, Spark uses broadcast variables for the Hadoop job configuration objects and large blocks of Python code for UDFs.
If a broadcast variable is no longer needed, you can explicitly remove it by calling unpersist()
on the broadcast variable.
Accumulators are the second type of Spark’s shared variables, allowing us to collect by-product information from a transformation or action on the workers and then bring the result back to the driver. With Spark’s execution model, Spark adds to accumulators only once the computation has been triggered (e.g., by an action). If the computation happens multiple times, Spark will update the accumulator each time. This multiple counting can be desirable for process-level information, like computing the entire time spent parsing records. However, it can be disastrous for data-related information like counting the number of invalid records.
Spark accumulators have had an API update for 2.0—these examples are updated for the 2.X API, although 1.X examples are still available in the examples repo.
Accumulators can be unpredictable. In their current state, they are best used where potential multiple counting is the desired behavior.
Accumulators have a number of built-in types that make it easy to create an accumulator for common use cases. Accumulators are not intended for collecting large amounts of information, so if you find yourself adding a large number of elements to a collection or appending to a string you may wish to consider a separate action instead of an accumulator. The default operation for numeric accumulators is the + operation, so we could use this to sum the fuzzyness of all of the pandas as shown in Example 5-18.
def
computeTotalFuzzyNess
(
sc
:
SparkContext
,
rdd
:
RDD
[
RawPanda
])
:
(
RDD
[(
String
,Long
)],
Double
)
=
{
// Create a named accumulator for doubles
val
acc
=
sc
.
doubleAccumulator
(
"fuzzyNess"
)
val
transformed
=
rdd
.
map
{
x
=>
acc
.
add
(
x
.
attributes
(
0
));
(
x
.
zip
,
x
.
id
)}
// accumulator still has zero value
// Note: This example is dangerous since the transformation may be
// evaluated multiple times.
transformed
.
count
()
// force evaluation
(
transformed
,
acc
.
value
)
}
Additionally, accumulators support a wide variety of data types provided the operation is associative, but some are easier to get in trouble with than others. To use an accumulator of a different type, you need to implement the AccumulatorV2[InputType, ValueType]
interface and provide reset
, copy
, isZero
, value
, merge
, and add
methods.
You are responsible for the specifics of the class that keeps track of the accumulated values. In general, a simple var
or two will do the trick.
In addition to the required method, override resetAndCopy
to improve performance in certain cases.
Generally the reset
and copy
methods are used together with the resetAndCopy
method, which can often be more efficiently implemented to avoid the copy stage (as is done in both of the custom accumulator examples, Examples 5-19 and 5-20).
The reset
method resets the value of the current accumulator back to “zero” so that isZero
, if called, will return true.
The copy
method needs to create a copy of the provided accumulator, with the new accumulator having the same value as the current accumulator.
This is called when copying the value to the workers so that Spark can avoid the expense (and confusion) of copying any of the previously accumulated work to the drivers.
The type parameters of the AccumulatorV2
interface specify the type being accumulated over (add
) and the final return type (value
).
Importantly, this does not constrain or specify the type used to hold the accumulation itself.
A single variable is used to keep track of values in the following examples. However, you need not limit yourself to one variable. Inside of many of Spark’s numeric accumulators, two var
s are used.
The merge
method for the accumulator API’s type signature takes the same base AccumulatorV2
type.
Since the AccumulatorV2
trait doesn’t specify anything about how workers should keep track of the values as they are evaluated, you will need to cast the accumulator you receive to the expected type so you can access your own internal accumulation field(s).
A basic implementation of this is shown in Example 5-19.
def
computeMaxFuzzyNess
(
sc
:
SparkContext
,
rdd
:
RDD
[
RawPanda
])
:
(
RDD
[(
String
,Long
)],
Option
[
Double
])
=
{
class
MaxDoubleAccumulator
extends
AccumulatorV2
[
Double
,Option
[
Double
]]
{
// Here is the var we will accumulate our value in to.
var
currentVal
:
Option
[
Double
]
=
None
override
def
isZero
=
currentVal
.
isEmpty
// Reset the current accumulator to zero - used when sending over the wire
// to the workers.
override
def
reset
()
=
{
currentVal
=
None
}
// Copy the current accumulator - this is only really used in context of
// copy and reset - but since it's part of the public API let's be safe.
def
copy
()
=
{
val
newCopy
=
new
MaxDoubleAccumulator
()
newCopy
.
currentVal
=
currentVal
newCopy
}
// We override copy and reset for "speed" - no need to copy the value if
// we are going to zero it right away. This doesn't make much difference
// for Option[Double] but for something like Array[X] could be huge.
override
def
copyAndReset
()
=
{
new
MaxDoubleAccumulator
()
}
// Add a new value (called on the worker side)
override
def
add
(
value
:
Double
)
=
{
currentVal
=
Some
(
// If the value is present compare it to the new value - otherwise
// just store the new value as the current max.
currentVal
.
map
(
acc
=>
Math
.
max
(
acc
,
value
)).
getOrElse
(
value
))
}
override
def
merge
(
other
:
AccumulatorV2
[
Double
,Option
[
Double
]])
=
{
other
match
{
case
otherFuzzy
:
MaxDoubleAccumulator
=>
// If the other accumulator has the option set merge it in with
// the standard add procedure. If the other accumulator isn't set
// do nothing.
otherFuzzy
.
currentVal
.
foreach
(
value
=>
add
(
value
))
case
_
=>
// This should never happen, Spark will only call merge with
// the correct type - but that won't stop someone else from calling
// merge so throw an exception just in case.
throw
new
Exception
(
"Unexpected merge with unsupported type"
+
other
)
}
}
// Return the accumulated value.
override
def
value
=
currentVal
}
// Create a new custom accumulator
val
acc
=
new
MaxDoubleAccumulator
()
sc
.
register
(
acc
)
val
transformed
=
rdd
.
map
{
x
=>
acc
.
add
(
x
.
attributes
(
0
));
(
x
.
zip
,
x
.
id
)}
// accumulator still has None value.
// Note: This example is dangerous since the transformation may be
// evaluated multiple times.
transformed
.
count
()
// force evaluation
(
transformed
,
acc
.
value
)
}
This still requires that the result is the same as the type we are accumulating. If we wanted to collect all of the distinct elements, we would likely want to collect a set and the types would be different. This is shown in Example 5-20.
def
uniquePandas
(
sc
:
SparkContext
,
rdd
:
RDD
[
RawPanda
])
:
HashSet
[
Long
]
=
{
class
UniqParam
extends
AccumulatorV2
[
Long
,HashSet
[
Long
]]
{
var
accValue
:
HashSet
[
Long
]
=
new
HashSet
[
Long
]()
def
value
=
accValue
override
def
copy
()
=
{
val
newCopy
=
new
UniqParam
()
newCopy
.
accValue
=
accValue
.
clone
newCopy
}
override
def
reset
()
=
{
this
.
accValue
=
new
HashSet
[
Long
]()
}
override
def
isZero
()
=
{
accValue
.
isEmpty
}
// We override copy and reset for speed - no need to copy the value if
// we care going to zero it right away.
override
def
copyAndReset
()
=
{
new
UniqParam
()
}
// For adding new values
override
def
add
(
value
:
Long
)
=
{
accValue
+=
value
}
// For merging accumulators
override
def
merge
(
other
:
AccumulatorV2
[
Long
,HashSet
[
Long
]])
=
{
other
match
{
case
otherUniq
:
UniqParam
=>
accValue
=
accValue
++
otherUniq
.
accValue
case
_
=>
throw
new
Exception
(
"only support merging with same type"
)
}
}
}
// Create an accumulator for keeping track of unique values
val
acc
=
new
UniqParam
()
// Register with a name
sc
.
register
(
acc
,
"Unique values"
)
val
transformed
=
rdd
.
map
{
x
=>
acc
.
add
(
x
.
id
);
(
x
.
zip
,
x
.
id
)}
// accumulator still has Double.MinValue
transformed
.
count
()
// force evaluation
acc
.
value
}
The value
function can perform complex work and return a different type than the input type or internal accumulated value. For example, if you were computing the average, you might have a value
function that divides two longs returning a double.
You may provide a name for accumulators in Scala so they show up in the web UI. Simply add a name as the second param. This does involve calling toString
on the accumulator, though—so if that is an expensive operation, leave your accumulator unnamed.
When working with cached data our accumulators can seem almost consistent, but as discussed in “Interaction with Accumulators” this is not the case.
There is a proposal to add data property (or “consistent”) accumulators in Spark 2.1.2 Property accumulators would avoid double counting—but this remains unmerged. You can follow its progress in this pull request.
Internally, beginning in Spark 2.0, Spark uses accumulators to keep track of task metrics.
Spark offers several options for RDD reuse, including persisting, caching, and checkpointing. However, Spark does not perform any of these automatically3 because storing RDD for reuse breaks some pipelining, which can be a waste if the RDD is only used once or if the transformation is inexpensive to recompute. All kinds of persistence (of which caching is one type) and checkpointing have some cost and are unlikely to improve performance for operations that are performed only once. Furthermore, on large datasets the cost of persisting or checkpointing can be so high that recomputing is more desirable. However, for some specific kinds of Spark programs, reusing an RDD can lead to huge performance gains, both in the terms of speed and reducing failures.
In this section we cover some instances when persisting or checkpointing RDDs may foster performance gains. Broadly speaking, the most important cases for reuse are using an RDD many times; performing multiple actions on the same RDD; and for long chains of (or very expensive) transformations.
For transformations that use the same parent RDD multiple times, reusing an RDD forces evaluation of that RDD and so can help avoid repeated computations. For example, if you were performing a loop of joins to the same dataset, persisting that dataset could lead to huge performance improvements since it ensures that the partitions of that RDD will be available in-memory to do each join.
In Example 5-21 we are computing the root mean squared error (RMSE) on a number of different RDDs representing predictions from different models. To do this we have to join each RDD of predictions to an RDD of the data in the validation set.
In this example we use persist()
, which persists the RDD in memory. As we will explain in “Types of Reuse: Cache, Persist, Checkpoint, Shuffle Files”, cache()
is equivalent to persist()
, which is equivalent to persist("MEMORY_ONLY")
.
val
testSet
:
Array
[
RDD
[(
Double
,Int
)]]
=
Array
(
validationSet
.
mapValues
(
_
+
1
),
validationSet
.
mapValues
(
_
+
2
),
validationSet
)
validationSet
.
persist
()
//persist since we are using this RDD several times
val
errors
=
testSet
.
map
(
rdd
=>
{
rmse
(
rdd
.
join
(
validationSet
).
values
)
})
Without persisting, Spark would have to reload and repartition the training dataset RDD to complete the join
. However, with persistence, the training RDD will stay loaded in memory on the executors with each run of the algorithm. We discuss performance considerations with different kinds of joins in detail in “Core Spark Joins”.
Checkpointing, another form of RDD reuse that writes an RDD to external storage, will also break the RDD’s lineage. However, checkpointing will keep the partitions loaded on the executors.
If you do not reuse an RDD, each action called on an RDD will launch its own Spark job with the full lineage of RDD transformations.
Persisting and checkpointing breaks the RDD’s lineage, so the same series of transformations preceding the persist
or checkpoint
call will be executed only once.
Because persisting or checkpointing an RDD lasts for the duration of a Spark application (although it may be evicted by subsequent cached/persisted data), an RDD persisted during one Spark job will be available in a subsequent job executed with the same SparkContext
.
For example, suppose that we wanted to collect the first 10% of the records in an RDD.
We could use the code in Example 5-22, which calls sortByKey
, then count
, then take
.
val
sorted
=
rddA
.
sortByKey
()
val
count
=
sorted
.
count
()
// sorted Action 1
val
sample
:
Long
=
count
/
10
val
sampled
=
sorted
.
take
(
sample
.
toInt
)
// sorted Action 2
The sortByKey
(and presumably the read operation) needed to create the RDD, sorted
, will occur twice if we do not store the RDD: once in the job called by count
and again in the job called by take
. We can’t test this element of the execution programmatically, but if you were to run this application and view the web UI you would see that this code launches two jobs and each one includes a sort stage.
However, if we add a persist or checkpoint call before the actions (as shown in Example 5-23), the transformation will only be executed once, since Spark builds a lineage graph from either an RDD’s creation or a persisted/checkpointed RDD.
val
sorted
=
rddA
.
sortByKey
()
sorted
.
persist
()
val
count
=
sorted
.
count
()
// sorted Action 1
val
sample
:
Long
=
count
/
10
val
sampled
=
sorted
.
take
(
sample
.
toInt
)
// sorted Action 2
Persisted RDDs only survive for the duration of a Spark application. To reuse data between Spark applications, use checkpointing with the same directory.
Even if a program does not use the same RDD multiple times, persisting and checkpointing can speed up a routine and reduce the cost of failures by storing intermediary results. Persisting or checkpointing can be particularly useful if the cost of computing one partition is very high because they ensure that the entire expensive operation will not need to be recomputed in the case of downstream failures.
For example, if your program requires a long series of one-to-one transformations, those transformations will all be combined into very computationally intensive tasks. While this is good so long as the tasks succeed and fit in memory, it does mean that if one of the downstream transformations fails, then the cost to recompute a single partition may be enormous. If all of the narrow transformations together create more GC overhead or memory strain than your cluster’s executors can handle, then checkpointing or persisting off_heap
can be particularly useful. Both persisting off_heap
and checkpointing allow the RDD to be stored outside of the Spark executor memory, leaving space to compute. These options are also the only way to prevent recomputation if the entire Spark worker fails. Sometimes breaking up a long lineage graph for its own sake can help a job succeed since it means each of the tasks will be smaller.
Narrow transformations are generally faster than wide ones. However, some individual narrow transformations, such as training a model per partition or working with very wide rows, can be expensive. In these cases, reusing an RDD after the expensive computation so it is not recomputed may improve performance.
Although persisting in memory is a flagship feature of Spark, it is not free. It is space intensive to store data in memory and will take time to serialize and deserialize. As we will discuss in “Dividing the Space Within One Executor”, persisting in memory and in-memory computations are both done in the Spark executor JVM. Thus, persisting in memory may take space that could be used for downstream computations or increase the risk or memory failures. Caching with Java-based memory structures (any of Spark’s options besides using off_heap
storage options) will incur a much higher garbage collecting cost than will recomputing.
Persisting to disk or checkpointing (writing the RDD to an external filesystem) has the disadvantages of MapReduce, causing expensive write and read operations. If the RDD is checkpointed or persisted to disk we must factor in not only the disk space used on the cluster to write the RDD, but also the computational cost on the Spark executors of the additional disk I/O. In most cases, checkpointing a large RDD can be used to reduce failures in high-traffic clusters but rarely leads to performance improvements, even if the RDD has to be recomputed due to the high cost of checkpointing.
Our experience has been that it is easy to underestimate just how expensive storing and reading an RDD is relative to recomputing. We have also found that for relatively simple operations the cost of the read operation needed to load the RDD far outweighs the others, so persisting is most useful when it prevents triggering another read operation or in the case of many iterative computations.
Furthermore, breaking an RDD’s lineage by forcing evaluation through persisting or checkpointing prevents transformations with narrow dependencies from being combined into a single task. Consequently, we lose some of the narrow transformations cannot be combined and executed in one task.
For instance, persisting or checkpointing between a simple map
and filter
step will break pipelining so that the previously intermediate data can be persisted, causing Spark to do two passes through the data rather than just one, since the transformation has to be evaluated in order to materialize the RDD after the map
. Breaking lineage between narrow transformations is only desirable in the most extreme cases.
The preceding guidelines are good heuristics for when reuse will provide significant benefits. In general, it is worth reusing an RDD rather than recomputing it if the computation is large relative to your cluster and the rest of your job. The best way to tell if you need to reuse your RDDs is to run a job. If your job runs very slowly, see if persisting the RDDs may help before attempting to rewrite the program since persisting and checkpointing will help reduce the cost of recomputing data in the case of a failure or eliminate it altogether. If a job is failing with GC or out-of-memory errors, checkpointing or persisting off_heap
may allow the job to complete, particularly if the cluster is noisy. On the other hand, if you were already persisting with the options that use in-memory persistence consider removing the persist call or switching to checkpointing or off_heap
persistence.
If you are testing some code before putting it into production, consider creating the persistence level with a variable so that you can pass in a persistence level to try as a command-line argument. The function presented in Example 5-24 uses this paradigm; it contains a storageLevel
argument (which could be NONE
).
If you decide that you need to reuse your RDD, Spark provides a multitude of options for how to store the RDD. Thus it is important to understand when to use the various types of persistence. There are three primary operations that you can use to store your RDD: cache, persist, and checkpoint. In general, caching (equivalent to persisting with the in-memory storage) and persisting are most useful to avoid recomputation during one Spark job or to break RDDs with long lineages, since they keep an RDD on the executors during a Spark job. Checkpointing is most useful to prevent failures and a high cost of recomputation by saving intermediate results. Like persisting, checkpointing helps avoid computation, thus minimizing the cost of failure, and avoids recomputation by breaking the lineage graph.
Persisting an RDD means materializing an RDD (usually by storing it in-memory on the executors), for reuse during the current job. Spark remembers a persisted RDD’s lineage so that it can recompute it for the duration of a Spark job if one of the persisted partitions is lost. After the job ends, the persist
function takes a StorageLevel
argument that specifies how the RDD should be stored. Spark provides a number of different storage levels as constants, but each one is created based on five attributes of how to store the RDD: useDisk
, useMemory
, useOfHeap
, deserialized
, and replication
. Calling toString
on a storage level will reveal what options it contains.
The Spark documentation about persistence includes a fairly comprehensive list of the out-of-the-box storage options that are exposed to you.
Still, we think it may useful to provide some more information about each of the five properties that compose each storage option. This should give you a deeper understanding of which option to choose:
useDisk
If set, partitions that do not fit in memory will be written to disk.
The storage-level flags containing DISK
(such as MEMORY_AND_DISK
) enable this. By default, if partitions do not fit in memory, they will simply be evicted and will need to be recomputed when the persisted RDD is used (see “LRU Caching”). Therefore, persisting to disk can ensure that recomputation of those additional large partitions is avoided. However, reading from disk can be time-intensive, so persistence to disk is only important if the cost of recomputation is particularly high.
It may be beneficial to allow writing to disk if you expect that an RDD cannot fit in memory. However, if the cost of recomputing the partitions is not high (they are simple mappings and don’t reduce the size of the data) it may actually be faster to recompute some partitions rather than read from disk.
useMemory
If set, the RDD will be stored in-memory or be directly written to disk.
The DISK_ONLY
storage levels are the only options that mark this as false. Most of the speed benefits of caching come from keeping RDDs in memory, so if the motivation for reuse is fast access for repeated computations, it is probably a good idea to choose a storage option that stores partitions in memory. However, there are some cases where disk-only persistence makes sense, e.g., when the computation is more expensive than reading in local disk or the network filesystem is especially slow (such as with certain object stores).
useOfHeap
If set, the RDD will be stored outside of the Spark executor in an external system such as Tachyon.
The storage option off_heap
enables this property. If memory is a serious issue, or a cluster is noisy and partitions are evicted, this option may be compelling. We will talk more about the benefits of Tachyon in “Alluxio (nee Tachyon)”.
deserialized
If set, the RDD will be stored as deserialized Java objects.
As we will discuss in “Kryo”, this can make storing RDDs more space efficient, especially when using a faster serializer—but incurs some performance overhead. Storage options that include the "_SER
" suffix such as MEMORY_ONLY_SER
enable serialization.
If your RDD is too large to persist in-memory, first try to serialize it with the MEMORY_ONLY_SER
option. This will keep the RDD fast to access, but will decrease the memory needed to store it.
replication
Replication is an integer that controls the number of copies of the persisted data to be stored in the cluster.
By default this is set to 1; however, serialization options that end in _2
such as DISK_ONLY_2
replicate each partition across two nodes. Use this option to ensure faster fault tolerance. However, be aware that persistence with replication incurs double the space and speed costs of persistence without replication. Replication is usually only necessary in an instance of a noisy cluster or bad connection where failures are unusually likely. It might also be useful if you do not have time to recompute in case of failure, such as when serving a live web application.
The RDD operation cache()
is equivalent to the persist operation with no storage level argument, i.e., persist()
. Both cache()
and persist()
persist the RDD with the default storage-level MEMORY_ONLY
, which is equivalent to StorageLevel(false, true, false, true)
, which stores RDDs in-memory as deserialized Java objects, does not write to disk as partitions get evicted, and doesn’t replicate partitions.
Checkpointing writes the RDD to an external storage system such as HDFS or S3, and—in contrast to persisting—forgets the RDD’s lineage. Since checkpointing requires writing the RDD outside of Spark, checkpointed information survives beyond the duration of a single Spark application and forces evaluation of an RDD. Checkpointing takes up more space in external storage and may be slower than persisting since it requires potentially costly write operations. However, it does not use any Spark memory and will not incur recomputation if a Spark worker fails.
Figure 5-2 illustrates the difference between in-memory persistence and checkpointing and RDD. Persisting stores the RDD’s partitions in-memory or on disk in the caching layer of each executor. Checkpointing writes each partition to some external system.
It is best to use checkpointing when the cost of failure and recomputation is of more concern than additional space in external storage. Broadly speaking, we advise persisting when jobs are slow and checkpointing when they are failing. If a Spark job is failing due to out-of-memory errors, checkpointing will reduce the cost and likelihood of failure without using up memory on the executors. If your jobs are failing due to network errors or preemption on a noisy cluster, checkpointing can reduce the likelihood of failure by breaking up a long-running job into smaller segments. To call checkpoint
, call setCheckpointDir(directory: String)
from the SparkContext
object and pass in a path to a location on HDFS to write the intermediate results. Then, in the Spark job, call .checkpoint()
from the RDD.
Example 5-24 makes use of custom storage level and checkpointing options. The function is used in the Goldilocks example, which we describe in detail in “Goldilocks Version 4: Reduce to Distinct on Each Partition”, that makes use of custom storage level and checkpointing options. In this case we are doing several very expensive transformations: first a sort and then two very substantial map partitions routines. When running on a noisy cluster,
we found it advantageous to checkpoint this function after the sort. The value of the directory
parameter is the checkpoint directory. The sorted
value is a sorted RDD or key/value pairs.
def
findQuantilesWithCustomStorage
(
valPairs
:
RDD
[((
Double
,Int
)
,Long
)],
colIndexList
:
List
[
Int
],
targetRanks
:
List
[
Long
],
storageLevel
:
StorageLevel
=
StorageLevel
.
MEMORY_AND_DISK
,
checkPoint
:
Boolean
,
directory
:
String
=
""
)
:
Map
[
Int
,Iterable
[
Double
]]
=
{
val
n
=
colIndexList
.
last
+
1
val
sorted
=
valPairs
.
sortByKey
()
if
(
storageLevel
!=
StorageLevel
.
NONE
)
{
sorted
.
persist
(
storageLevel
)
}
if
(
checkPoint
)
{
sorted
.
sparkContext
.
setCheckpointDir
(
directory
)
sorted
.
checkpoint
()
}
val
partitionColumnsFreq
=
getColumnsFreqPerPartition
(
sorted
,
n
)
val
ranksLocations
=
getRanksLocationsWithinEachPart
(
targetRanks
,
partitionColumnsFreq
,
n
)
val
targetRanksValues
=
findTargetRanksIteratively
(
sorted
,
ranksLocations
)
targetRanksValues
.
groupByKey
().
collectAsMap
()
}
Tachyon is a distributed, in-memory storage system that is developed separately from Spark. It sits above a storage system, such as S3 or HDFS, and can be used on its own or with an external computational framework such as Spark or MapReduce. Like Spark, Tachyon can be used in a standalone cluster mode, or with Mesos or YARN. Read more about Tachyon’s architecture and how to integrate it with Spark in the Tachyon documentation.
Tachyon can be used as an input or output source for Spark applications (data stored with Tachyon can be used to create RDDs) or for off_heap
persistence during a Spark application. Using Tachyon for persistence has several advantages. First, it reduces garbage collection overhead, since data is not stored as Java objects. Second, it allows multiple executors to share the same external memory pool in Tachyon. Third, since the data is stored in memory outside of Spark, it is not lost if individual executors crash. It can be particularly useful if you want to reuse an RDD but are running out of memory or seeing garbage collection errors. It is also the best way to reuse a very large RDD between multiple applications.
Tachyon’s developer and user communities are very strong in China, so part of its documentation may be stronger in Mandarin than in English.
RDDs that are stored in memory and/or on disk in Spark are not automatically un-persisted when they are no longer going to be used downstream. Instead, RDDs stay in memory for the duration of a Spark application, until the driver program calls the function unpersist
, or memory/storage pressure causes their eviction. Spark uses Least Recently Used or LRU caching, to determine which partitions to evict if the executors begin to run out of memory.
LRU caching dictates that the data structure that was least recently accessed will be evicted. However, because of lazy evaluation it may be a bit tricky to predict which partitions will be evicted first. Generally, Spark evicts the oldest partitions; those that were created or used in the earliest Spark job or in the earliest stage within a given job. (See “Dividing the Space Within One Executor” for a more detailed explanation of memory management and partitions.) LRU caching behaves differently for different persistence options. For memory-only persistence operations configured with LRU caching, Spark will recompute the evicted partition each time it is needed. For memory and disk options, LRU caching will write the evicted partition to disk. If you want to take a persisted RDD out of memory to free up space, use unpersist
.
Regardless of a persist
or checkpoint
call, Spark does write some data to disk during a shuffle. These files are called “shuffle files” and they usually contain all of the records in each input partition sorted by mapper. Usually shuffle files remain in the local directory on the workers for the duration of an application. Thus if the driver program reuses an RDD that has already been shuffled, Spark may be able to avoid recomputing that RDD up to the point of shuffle by using the shuffle files on the mapper.
Unlike the other caches, we can’t determine if a given RDD still has its shuffle files present; e.g., there is no equivalent of the isCheckPointed
command, which returns true if that RDD has been checkpointed. In general, though, shuffle files aren’t explicitly cleaned up until an RDD goes out of scope. However the web UI can be helpful in determining if stages are being skipped this way, as shown in Figure 5-3.
The performance of reusing shuffle files is similar to the performance of an RDD that is cached at the level of disk only.
Shuffle files can be large, and Spark has no explicit cache management for them. Keeping references to RDDs depending on shuffled output can lead to out-of-disk errors if the RDDs are not garbage collected on the driver.
Out-of-disk-space errors can be unexpected, but in clusters with small amounts of disk space they are surprisingly common. Disk space errors can be caused by long-running shell environments in which RDDs created at the top scope are never garbage collected. Spark writes the output of its shuffle operations to files on the disk of the workers in the Spark local dir. These files are only cleaned up when an RDD is garbage collected, which if the amount of memory assigned to the driver program is large, can occur infrequently. One solution is to explicitly trigger garbage collection (assuming the RDDs have gone out of scope)—if the DAG is getting too long, checkpointing can help make the RDDs available for garbage collection.
Noisy clusters, or those with a high volume of unpredictable traffic, pose a fundamental challenge to Spark’s evaluation. By default, Spark doesn’t save most intermediate results (besides in a shuffle step). Thus, in the case of preemptions, Spark will have to recompute the calculation in the job up to the point of failure. In a noisy cluster, where long-running jobs are often interrupted, this poses a huge challenge. Checkpointing can be especially helpful to get jobs to run at all. Checkpointing breaks an RDD’s lineage, therefore reducing the cost to recompute downstream transformations. Checkpointing also persists to external storage, so that unexpected failures do not lead to data loss. If failures are common but not fatal, it may be worth configuring your job to persist to multiple machines using a storage option like MEMORY_AND_DISK_2
, which replicates data on two machines. That way, failures on one node will not require a recompute. This can be especially important with wide transformations, which are very expensive.
By default, Spark uses a first in, first out (FIFO) paradigm to queue jobs within a system. This means that the first job submitted will run in its entirety, getting priority on all the available resources. However, if a job doesn’t need the whole cluster, the next job may start. FIFO scheduling can be useful to ensure that space-intensive jobs are able to use the resources that they need. However, if you launch a job a few seconds behind a many-hour process, the FIFO strategy can be frustrating. Spark offers a fair scheduler, modeled after the Hadoop fair scheduler, to allow high-traffic clusters to share resources more evenly. The fair scheduler allocates the tasks from different jobs to the executors in a “round-robin fashion” (i.e., parsing out a few tasks to the executors from each job). With the fair scheduler, a short, small job can be launched before an earlier long-running job is completed.
The fair scheduler also supports putting jobs into pools and allocating different priority (weight) to those pools. Jobs within a pool are allocated the same number of resources, and the pools are allocated resources according to their weight. Using pools can be a good way to ensure that high-priority jobs or very expensive jobs are completed. The fair scheduler also ensures that users are allocated resources evenly regardless of how many jobs they submit. You can read more about using and configuring a fair scheduler in the the Spark job scheduling documentation.
The interaction of caching and accumulators can make reasoning about accumulators more difficult. As we mentioned, if part of an RDD has to be recomputed, Spark may continue to add values to the accumulator as it recomputes; causing the values in the recomputed part to be double counted. Furthermore, not all computations will always compute the entirety of a partition. Surprisingly, caching does not prevent either double counting or problems that arise from partially evaluated partitions. Cached partitions may be evicted, so double counting may still arise if the machine with the cached data fails or if the partition is evicted to make space for a more recently cached partition. Unfortunately, caching with accumulators may cause a job that appears to compute the correct value on small data to later compute the incorrect value on large data.
Now that you have explored how to get the most out of your standard RDD transformations, as well as joins, it’s time to explore the concerns associated with the most important and complicated subset of RDD transformations, key/value pair operations. Not all of the techniques you will have learned need to be applied in every Spark program, and some of the takeaways from this chapter are more about when certain tools are not a good fit (see “Accumulators”). Many of the same techniques and considerations for standard RDD transformations apply when working with key/value data: if your transformation doesn’t depend on the key, the techniques from this chapter may even be more relevant.
1 If A and B are sets, (A - B) ∪ (B ∩ A)= A in all cases. This is not true in Spark. If rddA
or rddB
have duplicate keys or if rddA
and rddB
have overlapping keys, then (A - B) ∪ (B ∩ A) is a subset of A.
2 Originally planned for 2.0.
3 Some notable exceptions are inside of certain ML algorithms, which if passed in an unpersisted RDD will automatically persist and unpersist the RDD.