The Scala standard library offers a rich set of tools, such as parallel collections and concurrent classes to scale number-crunching applications. Although these tools are very effective in processing medium-sized datasets, they are unfortunately quite often discarded by developers in favor of more elaborate frameworks.
Although code optimization and memory management is beyond the scope of this chapter, it is worthwhile to remember that a few simple steps can be taken to improve the scalability of an application. One of the most frustrating challenges in using Scala to process large datasets is the creation of a large number of objects and the load on the garbage collector.
A partial list of remedial actions is as follows:
javap
to decipher the generation of byte code by the JVMSome problems require the preprocessing and training of very large datasets, resulting on significant memory consumption by the JVM. Streams are list-like collections in which elements are instantiated or computed lazily. Streams share the same goal of postponing computation and memory allocation as views.
Let's consider the computation of the loss function in machine learning. An observation of the DataPoint
type is defined as a features vector, x
, and a labeled or expected value, y
:
case class DataPoint(x: DblVector, y: Double)
We can create a loss function, LossFunction
, that processes a very large dataset on a platform with limited memory. The optimizer responsible for the minimization of the loss or error invokes the loss function at each iteration or recursion, as described in the following diagram:
The constructor of the LossFunction
class has the following three arguments (line 2
):
f
of the loss for each data pointweights
of the modeldataSize
The code is as follows:
type StreamLike = WeakReference[Stream[DataPoint]] //1 class LossFunction( f: (DblVector, DblVector) => Double, weights: DblVector, dataSize: Int) { //2 var nElements = 0 def compute(stream: () => StreamLike): Double = compute(stream().get, 0.0) //3 def _loss(xs: List[DataPoint]): Double = xs.map( dp => dp.y - f(weights, dp.x)).map( sqr(_)).sum //4 }
The loss function for the stream is implemented as the compute
tail recursion (line 3
). The recursive method updates the reference of the stream. The type of reference of the stream is WeakReference
(line 1
), so the garbage collection can reclaim the memory associated with the slice for which the loss has been computed. In this example, the loss function is computed as a sum of squared errors (line 4
).
The compute
method manages the allocation and release of slices of stream:
@tailrec def compute(stream: Stream[DataPoint], loss: Double): Double = { if( nElements >= dataSize) loss else { val step = if(nElements + STEP > dataSize) dataSize - nElements else STEP nElements += step val newLoss = _loss(stream.take(step).toList) //5 compute( stream.drop(STEP), loss + newLoss ) //6 } }
The dataset is processed in two steps:
take
) a slice of the stream of observations and then computes the cumulative loss for all the observations in the slice (line 5
)drop
) (line 6
)An alternative to weak references
There are alternatives to weak references in order for the stream to force the garbage collector to reclaim the memory blocks associated with each slice of observations, which are as follows:
def
List
iteratorThe average memory allocated during the execution of the loss function for the entire stream is the memory needed to allocate a single slice.
The Scala standard library includes parallelized collections, whose purpose is to shield developers from the intricacies of concurrent thread execution and race condition. Parallel collections are a very convenient approach to encapsulate concurrency constructs to a higher level of abstraction [12:1].
There are two ways to create parallel collections in Scala, which are as follows:
par
method; for example, List[T].par: ParSeq[T]
, Array[T].par: ParArray[T]
, Map[K,V].par: ParMap[K,V]
, and so oncollection.parallel
, parallel
. immutable
, or parallel.mutable
packages; for example, ParArray
, ParMap
, ParSeq
, ParVector
, and so onA parallel collection does lend itself to concurrent processing until a pool of threads and a task scheduler are assigned to it. Fortunately, Scala parallel and concurrent packages provide developers with a powerful toolbox to map partitions or segments of collection to tasks running on different CPU cores. The components are as follows:
TaskSupport
: This trait inherits the generic Tasks
trait. It is responsible for scheduling the operation on the parallel collection. There are three concrete implementations of TaskSupport
.ThreadPoolTaskSupport
: This uses the threads pool in an older version of the JVM.ExecutionContextTaskSupport
: This uses ExecutorService
that delegates the management of tasks to either a thread pool or the ForkJoinTasks
pool.ForkJoinTaskSupport
: This uses the fork-join pools of the java.util. concurrent.FortJoinPool
type introduced in the Java SDK 1.6. In Java, a fork-join pool is an instance of ExecutorService
that attempts to run not only the current task but also any of its subtasks. It executes the ForkJoinTask
instances that are lightweight threads.The following example implements the generation of a random exponential value using a parallel vector and ForkJoinTaskSupport
:
val rand = new ParVector[Float] Range(0,MAX).foreach(n => rand.updated(n, n*Random.nextFloat))//1 rand.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(16)) val randExp = vec.map( Math.exp(_) ) //2
The rand
parallel vector of random probabilities is created and initialized by the main task (line 1
), but the conversion to a vector of a randExp
exponential value is executed by a pool of 16 concurrent tasks (line 2
).
The main purpose of parallel collections is to improve the performance of execution through concurrency. The first step is to either select an existing benchmark or create our own benchmark.
Let's create a ParBenchmark
parameterized class to evaluate the performance of operations on parallel collections:
abstract class ParBenchmark[U](times: Int) { def map(f: U => U)(nTasks: Int): Double //1 def filter(f: U => Boolean)(nTasks: Int): Double //2 def timing(g: Int => Unit ): Long }
The user has to supply the data transformation f
for the map
(line 1
) and filter
(line 2
) operations of parallel collections as well as the number of concurrent tasks nTasks
. The timing
method collects the duration of the times
execution of a given operation g
on a parallel collection:
def timing(g: Int => Unit ): Long = { var startTime = System.currentTimeMillis Range(0, times).foreach(g) System.currentTimeMillis - startTime }
Let's define the mapping and reducing operation for the parallel arrays for which the benchmark is defined as follows:
class ParArrayBenchmark[U](u: Array[U], //3 v: ParArray[U], //4 times:Int) extends ParBenchmark[T](times)
The first argument of the benchmark constructor is the default array of the Scala standard library (line 3
). The second argument is the parallel data structure (or class) associated with the array (line 4
).
Let's compare the parallelized and default array on the map
and reduce
methods of ParArrayBenchmark
as follows:
def map(f: U => U)(nTasks: Int): Unit = { val pool = new ForkJoinPool(nTasks) v.tasksupport = new ForkJoinTaskSupport(pool) val duration = timing(_ => u.map(f)).toDouble //5 val ratio = timing( _ => v.map(f))/duration //6 show(s"$numTasks, $ratio") }
The user has to define the mapping function f
and the number of concurrent tasks nTasks
available to execute a map transformation on the array u
(line 5
) and its parallelized counterpart v
(line 6
). The reduce
method follows the same design, as shown in the following code:
def reduce(f: (U,U) => U)(nTasks: Int): Unit = { val pool = new ForkJoinPool(nTasks) v.tasksupport = new ForkJoinTaskSuppor(pool) val duration = timing(_ => u.reduceLeft(f)).toDouble //7 val ratio = timing( _ => v.reduceLeft(f) )/duration //8 show(s"$numTasks, $ratio") }
The user-defined function f
is used to execute the reduce action on the array u
(line 7
) and its parallelized counterpart v
(line 8
).
The same template can be used for other higher Scala methods, such as filter
.
The absolute timing of each operation is completely dependent on the environment. It is far more useful to record the ratio of the duration of execution of the operation on the parallelized array, over the single thread array.
The benchmark class ParMapBenchmark
used to evaluate ParHashMap
is similar to the benchmark for ParArray
, as shown in the following code:
class ParMapBenchmark[U](val u: Map[Int, U],
val v: ParMap[Int, U],
times: Int) extends ParBenchmark[T](times)
For example, the filter
method of ParMapBenchmark
evaluates the performance of the parallel map v
relative to a single-threaded map u
. It applies the filtering condition to the values of each map, as follows:
def filter(f: U => Boolean)(nTasks: Int): Unit = { val pool = new ForkJoinPool(nTasks) v.tasksupport = new ForkJoinTaskSupport(pool) val duration = timing(_ => u.filter(e => f(e._2))).toDouble val ratio = timing( _ => v.filter(e => f(e._2)))/duration show(s"$nTasks, $ratio") }
The first performance test consists of creating a single-threaded and a parallel array of random values and executing the map
and reduce
evaluation methods, on using an increasing number of tasks, as follows:
val sz = 1000000; val NTASKS = 16 val data = Array.fill(sz)(Random.nextDouble) val pData = ParArray.fill(sz)(Random.nextDouble) val times: Int = 50 val bench = new ParArrayBenchmark[Double](data, pData, times) val mapper = (x: Double) => Math.sin(x*0.01) + Math.exp(-x) Range(1, NTASKS).foreach(bench.map(mapper)(_)) val reducer = (x: Double, y: Double) => x+y Range(1, NTASKS).foreach(bench.reduce(reducer)(_))
The following graph shows the output of the performance test:
The test executes the mapper and reducer functions 1 million times on an 8-core CPU with 8 GB of available memory on the JVM.
The results are not surprising in the following respects:
ParArray
has a small overhead in the single-task scenario and then matches the performance of Array
.map
function benefits from the parallelization of the array. The performance levels off when the number of tasks allocated equals or exceeds the number of CPU core.The second test consists of comparing the behavior of the ParArray
and ParHashMap
parallel collections, on the map
and filter
methods, using a configuration identical to the first test as follows:
val sz = 10000000 val mData = new HashMap[Int, Double] Range(0, sz).foreach( mData.put(_, Random.nextDouble)) //9 val mParData = new ParHashMap[Int, Double] Range(0, sz).foreach( mParData.put(_, Random.nextDouble)) val bench = new ParMapBenchmark[Double](mData, mParData, times) Range(1, NTASKS).foreach(bench.map(mapper)(_)) //10 val filterer = (x: Double) => (x > 0.8) Range(1, NTASKS).foreach( bench.filter(filterer)(_)) //11
The test initializes a HashMap
instance and its ParHashMap
parallel counter with 1 million random values (line 9
). The benchmark bench
processes all the elements of these hash maps with the mapper
instance introduced in the first test (line 10
) and a filtering function filterer
(line 11
) with NTASKS
equal to 6. The output is shown in the following diagram:
The impact of the parallelization of collections is very similar across methods and collections. It's important to notice that the performance of the parallel collections levels off at around four times the single thread collections for five concurrent tasks and above. Core parking is partially responsible for this behavior. Core parking disables a few CPU cores in an effort to conserve power, and in the case of a single application, it consumes almost all CPU cycles.
Clearly, a four times increase in performance is nothing to complain about. Having said that, parallel collections are limited to single-host deployments. If you cannot live with such a restriction and still need a scalable solution, the Actor model provides a blueprint for highly distributed applications.