Apache Spark is a fast and general-purpose cluster computing system, initially developed as AMPLab/UC Berkeley as part of the Berkeley Data Analytics Stack (BDAS) (http://en.wikipedia.org/wiki/UC_Berkeley). It provides high-level APIs for the following programming languages that make large and concurrent parallel jobs easy to write and deploy [12:11]:
The core element of Spark is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from a local data structure such as a list, array, or hash table, from the local filesystem or the Hadoop distributed file system (HDFS).
The operations on an RDD in Spark are very similar to the Scala higher-order methods. These operations are performed concurrently over each partition. Operations on RDDs can be classified as follows:
An RDD can be persisted, serialized, and cached for future computation.
Spark is written in Scala and built on top of Akka libraries. Spark relies on the following mechanisms to distribute and partition RDDs:
The Spark ecosystem can be represented as stacks of technology and framework, as seen in the following diagram:
The Spark ecosystem has grown to support some machine learning algorithms out of the box, such as MLlib, a SQL-like interface to manipulate datasets with relational operators, SparkSQL, a library for distributed graphs, GraphX, and a streaming library [12:12].
The authors of Spark attempt to address the limitations of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In the case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.
Spark provides a large array of prebuilt transforms and actions that go well beyond the basic map-reduce paradigm. These methods on RDDs are a natural extension of the Scala collections, making code migration seamless for Scala developers.
Finally, Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the filesystem. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes, and the replication schemes of the HDFS.
The performance of Spark relies on the following five core design principles [12:13]:
The developer can decide to persist and/or cache an RDD for future usage. An RDD may persist in memory only or on disk only—in memory if available, or on disk otherwise as deserialized or serialized Java objects. For instance, an RDD, rdd
, can be cached through serialization through a simple statement, as shown in the following code:
rdd.persist(StorageLevel.MEMORY_ONLY_SER).cache
Scala supports lazy values natively. The left-hand side of the assignment, which can either be a value, object reference, or method, is performed once, that is, the first time it is invoked, as shown in the following code:
class Pipeline { lazy val x = { println("x"); 1.5} lazy val m = { println("m"); 3} val n = { println("n"); 6} def f = (m <<1) def g(j: Int) = Math.pow(x, j) } val pipeline = new Pipeline //1 pipeline.g(pipeline.f) //2
The order of the variables printed is n
, m
, and then x
. The instantiation of the Pipeline
class initializes n
but not m
or x
(line 1
). At a later stage, the g
method is called, which in turn invokes the f
method. The f
method initializes the m
value it needs, and then g
initializes x
to compute its power to m <<1
(line 2
).
Spark applies the same principle to RDDs by executing the transformation only when an action is performed. In other words, Spark postpones memory allocation, parallelization, and computation until the driver code gets the result through the execution of an action. The cascading effect of invoking all these transformations backward is performed by the direct acyclic graph scheduler.
Spark is implemented in Scala, so you should not be too surprised to know that the most relevant Scala higher methods on collections are supported in Spark. The first table describes the transformation methods using Spark, as well as their counterparts in the Scala standard library. We use the (K, V) notation for (key, value) pairs:
Spark |
Scala |
Description |
---|---|---|
|
|
This transforms an RDD by executing the |
|
|
This transforms an RDD by selecting the element for which the |
|
|
This transforms an RDD by mapping each element to a sequence of output items |
|
This executes the | |
|
This samples a fraction of the data with or without a replacement using a random generator | |
|
|
This is called on (K,V) to generate a new (K, Seq(V)) RDD |
|
|
This creates a new RDD as an union of this RDD and the argument |
|
|
This eliminates duplicate elements from this RDD |
|
|
This aggregates or reduces the value corresponding to each key using the |
|
|
This reorganizes (K,V) in an RDD by ascending, descending, or otherwise specified order of the keys, K |
|
This joins an RDD (K,V) with an RDD (K,W) to generate a new RDD (K, (V,W)) | |
|
This implements a join operation but generates an RDD (K, Seq(V), Seq(W)) |
Action methods trigger the collection or the reduction of the datasets from all partitions back to the driver, as listed here:
Spark |
Scala |
Description |
---|---|---|
|
|
This aggregates all the elements of the RDD across all the partitions and returns a Scala object to the driver |
|
|
This collects and returns all the elements of the RDD across all the partitions as a list in the driver |
|
|
This returns the number of elements in the RDD to the driver |
|
|
This returns the first element of the RDD to the driver |
|
|
This returns the first |
|
This returns an array of random elements from the RDD back to the driver | |
|
This writes the elements of the RDD as a text file in either the local filesystem or HDFS | |
|
This generates an (K, Int) RDD with the original keys, K, and the count of values for each key | |
|
|
This executes a |
Scala methods such as fold
, find
, drop
, flatten
, min
, max
, and sum
are not currently implemented in Spark. Other Scala methods such as zip
have to be used carefully, as there is no guarantee that the order of the two collections in zip
is maintained between partitions.
In a perfect world, variables are immutable and local to each partition to avoid race conditions. However, there are circumstances where variables have to be shared without breaking the immutability provided by Spark. To this extent, Spark duplicates shared variables and copies them to each partition of the dataset. Spark supports the following types of shared variables:
The four design principles can be summarized in the following diagram:
The preceding diagram illustrates the most common interaction between the Spark driver and its workers, as listed in the following steps:
If you take a look at it closely, the management of datasets and RDDs by the Spark driver is not very different from that by the Akka master and worker actors of futures.
Spark's in-memory computation for iterative computing makes it an excellent candidate to distribute the training of machine learning models, implemented with dynamic programming or optimization algorithms. Spark runs on Windows, Linux, and Mac OS operating systems. It can be deployed either in local mode for a single host or master mode for a distributed environment. The version of the Spark framework used is 1.3.
The easiest way to learn Spark is to deploy a localhost in standalone mode. You can either deploy a precompiled version of Spark from the website, or build the JAR files using the simple build tool (sbt) or Maven [12:14] as follows:
MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" mvn [args] –DskipTests clean package
The following are some examples:
mvn -Pyarn –Phadoop-2.4 –Dhadoop.version-2.4.0 –DskipTests clean package
mvn -Pyarn –Phadoop-2.6 –Dhadoop.version-2.6.0 –Dscala-2.11 –DskipTests clean package
sbt/sbt [args] assembly
The following are some examples:
sbt -Pyarn –pHadoop 2.4 assembly
sbt -Pyarn –pHadoop 2.6 –Dscala-2.11 assembly
Apache supports multiple deployment modes:
The communication between a master node (or driver), cluster manager, and set of slave (or worker) nodes is illustrated in the following diagram:
Use any of the following methods to use the Spark shell:
./bin/spark-shell –master local[8]
to execute the shell on an 8-core localhost../bin/spark-submit --class application_class --master local[4] --executor-memory 12G --jars myApplication.jar –class myApp.class
The command launches the application, myApplication
, with the myApp.main
main method on a 4-core CPU localhost and 12 GB of memory.
./bin/spark-submit --class application_class --master spark://162.198.11.201:7077 –-total-executor-cores 80 --executor-memory 12G --jars myApplication.jar –class myApp.class
The output will be as follows:
Potential pitfalls with the Spark shell
Depending on your environment, you might need to disable logging information into the console by reconfiguring conf/ log4j.properties
. The Spark shell might also conflict with the declaration of classpath in the profile or the environment variables' list. In this case, it has to be replaced by ADD_JARS
as an environment variable such as ADD_JARS = path1/jar1, path2/jar2
.
MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress.
The main components of the library are as follows:
The machine learning bytecode is conveniently included in the Spark assembly JAR file built with the simple build tool.
The transformation and actions are performed on RDDs. Therefore, the first step is to create a mechanism to facilitate the generation of RDDs from a time series. Let's create an RDDSource
singleton with a convert
method that transforms a time series xt
into an RDD, as shown here:
def convert( xt: immutable.Vector[DblArray], rddConfig: RDDConfig) (implicit sc: SparkContext): RDD[Vector] = { val rdd: RDD[Vector] = sc.parallelize(xt.toVector.map(new DenseVector(_))) //3 rdd.persist(rddConfig.persist) //4 if( rddConfig.cache) rdd.cache //5 rdd }
The last rddConfig
argument of the convert
method specifies the configuration for the RDD. In this example, the configuration of the RDD consists of enabling/disabling cache and selecting the persistency model, as follows:
case class RDDConfig(val cache: Boolean,
val persist: StorageLevel)
It is fair to assume that SparkContext
has already been implicitly defined in a manner quite similar to ActorSystem
in the Akka framework.
The generation of the RDD is performed in the following steps:
parallelize
method of the context and convert it into a vector (SparseVector
or DenseVector
) (line 3
).3
).5
).Once the RDD is created, it can be used as an input for any algorithm defined as a sequence of transformation and actions. Let's experiment with the implementation of the K-means algorithm in Spark/MLlib.
The first step is to create a SparkKMeansConfig
class to define the configuration of the Apache Spark K-means algorithm, as follows:
class SparkKMeansConfig(K: Int, maxIters: Int, numRuns: Int = 1) { val kmeans: KMeans = { (new KMeans).setK(K) //6 .setMaxIterations(maxIters) //7 .setRuns(numRuns) //8 } }
The minimum set of initialization parameters for MLlib K-means algorithm is as follows:
K
(line 6
)maxIters
(line 7
)numRuns
(line 8
)The SparkKMeans
class wraps the Spark KMeans
into a data transformation of the ITransform
type, as described in the Monadic data transformation section in Chapter 2, Hello World! The class follows the design template for a classifier, as explained in the Design template for immutable classifiers section in the Appendix A, Basic Concepts:
class SparkKMeans( //9 kMeansConfig: SparkKMeansConfig, rddConfig: RDDConfig, xt: Vector[DblArray]) (implicit sc: SparkContext) extends ITransform[DblArray](xt){ type V = Int //10 val model: Option[KMeansModel] = train //11 override def |> : PartialFunction[DblArray, Try[V]] //12 def train: Option[KMeansModel] }
The constructor takes three arguments: the Apache Spark KMeans
configuration kMeansConfig
, the RDD configuration rddConfig
, and the xt
input time series for clustering (line 9
). The return type of the ITransform
trait's partial function |>
is defined as an Int
(line 10
).
The generation of model
merely consists of converting the time series xt
into an RDD using rddConfig
and invoking MLlib KMeans.run
(line 11
). Once it is created, the model of clusters (KMeansModel
) is available for predicting a new observation, x
, (line 12
), as follows:
override def |> : PartialFunction[DblArray, Try[V]] = {
case x: DblArray if(x.length > 0 && model != None) =>
Try[V](model.get.predict(new DenseVector(x)))
}
The |>
prediction method returns the index of the cluster of observations.
Finally, let's write a simple client program to exercise the SparkKMeans
model using the volatility of the price of a stock and its daily trading volume. The objective is to extract clusters with features (volatility and volume), each cluster representing a specific behavior of the stock:
val K = 8 val RUNS = 16 val MAXITERS = 200 val PATH = "resources/data/chap12/CSCO.csv" val CACHE = true val sparkConf = new SparkConf().setMaster("local[8]") .setAppName("SparkKMeans") .set("spark.executor.memory", "2048m") //13 implicit val sc = new SparkContext(sparkConf) //14 extract.map { case (vty,vol) => { //15 val vtyVol = zipToSeries(vty, vol) val conf = SparkKMeansConfig(K,MAXITERS,RUNS) //16 val rddConf = RDDConfig(CACHE, StorageLevel.MEMORY_ONLY) //17 val pfnSparkKMeans = SparkKMeans(conf,rddConf,vtyVol) |> //18 val obs = Array[Double](0.23, 0.67) val clusterId = pfnSparkKMeans(obs) }
The first step is to define the minimum configuration for the sc
context (line 13
) and initialize it (line 14
). The vty
and vol
volatility variables are used as features for K-means and extracted from a CSV file (line 15
):
def extract: Option[(DblVector, DblVector)] = {
val extractors = List[Array[String] => Double](
YahooFinancials.volatility, YahooFinancials.volume
)
val pfnSrc = DataSource(PATH, true) |>
pfnSrc( extractors ) match {
case Success(x) => Some((x(0).toVector, x(1).toVector))
case Failure(e) => { error(e.toString); None }
}
}
The execution creates a configuration config
for the K-means (line 16
) and another configuration for the Spark RDD, rddConfig
, (line 17
). The pfnSparkKMeans
partial function, which implements the K-means algorithm, is created with the K-means, RDD configurations, and the input data vtyVol
(line 18
).
Let's execute the normalization of the cross-validation groups on an 8-core CPU machine with 32 GB of RAM. The data is partitioned with a ratio of two partitions per CPU core.
The performance of a Spark application depends greatly on the configuration parameters. Selecting the appropriate value for those configuration parameters in Spark can be overwhelming—there are 54 configuration parameters as of the last count. Fortunately, the majority of those parameters have relevant default values. However, there are few parameters that deserve your attention, including the following:
config.cores.max
).spark.executor.memory
). Setting the value to 60 percent of the maximum JVM heap is a generally a good compromise.reduceByKey
(spark.default.parallelism
). The recommended formula is parallelism = total number of cores x 2. The value of the parameter can be overridden with the spark.reduceby.partitions
parameter for specific RDD reducers.MEMORY_ONLY_SER
(spark.rdd.compress
). The purpose is to reduce memory footprints at the cost of extra CPU cycles.spark.akka.frameSize
driver. This value needs to be increased if a collection may potentially generate a large size array.spark.broadcast.compress
variables. It is usually recommended.The purpose of the test is to evaluate how the execution time is related to the size of the training set. The test executes K-means from the MLlib library on the volatility and trading session volume on the Bank of America (BAC) stock over the following periods: 3 months, 6 months, 12 months, 24 months, 48 months, 60 months, 72 months, 96 months, and 120 months.
The following configuration is used to perform the training of K-means: 10 clusters, 30 maximum iterations, and 3 runs. The test is run on a single host with 8-CPU cores and 32 GB of RAM. The test was conducted with the following values of parameters:
StorageLevel = MEMORY_ONLY
spark.executor.memory = 12G
spark.default.parallelism = 48
spark.akka.frameSize = 20
spark.broadcast.compress = true
The first step after executing a test for a specific dataset is to log in to the Spark monitoring console at http://host_name:4040/stages
:
Obviously, each environment produces somewhat different performance results but confirms that the time complexity of the Spark K-means is a linear function of the training set.
Performance evaluation in a distributed environment
A Spark deployment on multiple hosts will add latency to the overall execution time of the TCP communication. The latency is related to the collection of the results of the clustering back to the Spark driver, which is negligible and independent of the size of the training set.
This test barely scratches the surface of the capabilities of Apache Spark. The following are the lessons learned from personal experience in order to avoid the most common performance pitfalls when deploying Spark 1.3+:
reduceByKey
has its price. The ratio of number of partitions to number of cores has an impact on the performance of a reducer using keys.collect
, count
, or lookup
. An action reduces the data residing in the RDD partitions, and then forwards it to the Spark driver. The Spark driver (or master) program runs on a single JVM with limited resources.An increasing number of organizations are adopting Spark as their distributed data processing platform for real-time or pseudo real-time operations. There are several reasons for the fast adoption of Spark:
However, no platform is perfect and Spark is no exception. The most common complaints or concerns regarding Spark are as follows:
Sparkling Water is an initiative to integrate 0xdata H2O with Spark and complement MLlib [12:16]. H2O from 0xdata is a very fast, open source, in-memory platform for machine learning for very large datasets (http://0xdata.com/product/). The framework is worth mentioning for the following reasons:
H2O has an extensive implementation of the generalized linear model and gradient boosted classification, among other goodies. Its data representation consists of hierarchical data frames. A data frame is a container of vectors potentially shared with other frames. Each vector is composed of data chunks, which themselves are containers of data elements [12:17]. At the time of writing, Sparkling Water is in beta version.