Handling persistence in Spark

In this section, we will discuss how the persistence or caching is being handled in Spark. We will talk about various persistence and caching mechanisms provided by Spark along with their significance.

Persistence/caching is one the important components or features of Spark. Earlier, we talked about the computations/transformations are lazy in Spark and the actual computations do not take place unless any action is invoked on the RDD. Though this is a default behavior and provides fault tolerance, sometimes it also impacts the overall performance of the job, especially when we have common datasets that are leveraged and used across the computations.

Persistence/caching helps us in solving this problem by exposing the persist() or cache() operations in the RDD. The persist() or cache() operations store the computed partition of the invoking RDD in the memory of all nodes and reuses them in other actions on that dataset (or datasets derived from it). This enables the future transformations/actions to be much faster—sometimes more than 10x. Caching/persistence is also a key tool for machine learning and iterative algorithms.

Spark provides different levels of the persistence, which are known as storage levels. They allow us to persist the data only in memory, only on disk, in memory but in compressed form, or maybe use some off heap storage mechanism like Tachyon (http://tachyon-project.org/). All these storage levels are defined by org.apache.spark.storage.StorageLevel.

Let's discuss the various storage levels provided by Spark and also their appropriate use cases:

  • StorageLevel.MEMORY_ONLY: This storage level stores the RDD in the Spark cluster memory in the deserialized Java objects. In case memory is not sufficient to store the complete dataset, then some partitions may not be stored and will be recomputed each time it is required. This is the default level and allows the highest level of performance for our Spark jobs. This level should only be considered when we have enough memory to store/persist the computed dataset in the memory.
  • StorageLevel.MEMORY_ONLY_SER: This is similar to MEMORY_ONLY with the difference that it stores the computed data in the form of serialized Java objects that in turn help in saving some space. We need to be cautious that serialization/deserialization mechanisms should not be overhead. In a nutshell, we need to use fast serialization libraries like https://github.com/EsotericSoftware/kryo.

    Note

    Refer to http://spark.apache.org/docs/latest/tuning.html#data-serialization for more information on tune and optimize serialization process.

  • StorageLevel.MEMORY_AND_DISK: This is similar to MEMORY_ONLY and the only difference is that it stores the computed partitions on the DISK only when memory is not sufficient to store everything. It reads computed partitions from the DISK and does not recompute each time data is requested. We need to be cautious while using this level and ensure that reading and writing from disk is really faster than the whole process of recomputation of the dataset.
  • StorageLevel.MEMORY_AND_DISK_SER: Similar to MEMORY_AND_DISK with a difference that it stores the computed partitions in form of serialized Java objects. Similar to MEMORY_ONLY_SER level, we need to be careful and use a fast serialization library and ensure that the serialization process is not causing significant latency.
  • StorageLevel.DISK_ONLY: This stores the computed partitions on the disk only. There is nothing in the memory and the data is read each time it is requested. This is not recommended for the jobs where performance is the criteria for success.
  • StorageLevel.MEMORY_ONLY_2, MEMORY_AND_DISK_2, and so on: All storage levels that end with _2 provide similar functionality to the previously defined levels with an added capability of replicating the computed partitions to at least two nodes in the cluster. For example, MEMORY_ONLY_2 is similar to MEMORY_ONLY but at the same time it also replicates the computed partitions to two nodes in the cluster.
  • StorageLevel.OFF_HEAP: This storage level is marked as experimental and is still being tested for the actual production use cases. In OFF_HEAP, the data is stored in a serialized format in Tachyon (http://tachyon-project.org/). Tachyon is an optimized off heap storage solution which reduces the overhead of garbage collection and allows executors to be smaller and to share a pool of memory, which in turn makes it attractive in environments where we need large heaps for data storage or maybe multiple concurrent applications running in same JVM. Persisting data in off heap storage like Tachyon also has an added benefit that Spark does not have to recompute the data in case Spark executors crash. As cached partitions are stored in off heap memory, Spark executors are used only for executing the Spark jobs and for data storage off heap memory is leveraged. In this mode, the memory in Tachyon can be discarded. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.

    Note

    Spark provides compatibility with Tachyon. Refer to http://tachyon-project.org/master/Running-Spark-on-Tachyon.html for more info on integrating Spark with Tachyon.

Apart from the different storage levels, Spark provides performance optimization by persisting intermediate data in shuffle operations (for example, reduceByKey), even without users explicitly calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. It is also important to note that the persistence level can be defined once for an RDD and once it is defined, it cannot be changed during the lifetime of the job execution.

Spark leverages least-recently-used (LRU) for automatically discarding or flushing the data from the memory (based on storage levels). But we can also explicitly invoke RDD.unpersist() for freeing up the Spark memory.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset