The Akka framework extends the original Actor model in Scala by adding extraction capabilities such as support for typed Actor, message dispatching, routing, load balancing, and partitioning, as well as supervision and configurability [12:7].
The Akka framework can be downloaded from the http://akka.io/ website or through the Typesafe Activator at http://www.typesafe.com/platform.
Akka simplifies the implementation of the Actor model by encapsulating some of the details of Scala Actor in the akka.actor.Actor
and akka.actor.ActorSystem
classes.
The three methods you want to override are as follows:
prestart
: This is an optional method that is invoked to initialize all the necessary resources such as file or database connection before the Actor is executedreceive
: This method defines the Actor's behavior and returns a partial function of the PartialFunction[Any, Unit]
typepostStop
: This is an optional method to clean up resources such as releasing memory, closing database connections, and socket or file handlesTyped and untyped actors
Untyped actors can process messages of any type. If the type of the message is not matched by the receiving actor, it is discarded. Untyped actors can be regarded as contract-less actors. They are the default actors in Scala.
Typed actors are similar to Java remote interfaces. They respond to a method invocation. The invocation is declared publicly, but the execution is delegated asynchronously to the private instance of the target actor [12:8].
Akka offers a variety of functionalities to deploy concurrent applications. Let's create a generic template for a master Actor and worker Actors to transform a dataset using any preprocessing or classification algorithm inherited from an explicit or implicit monadic data transformation, as described in the Monadic data transformation section in Chapter 2, Hello World!
The master Actor manages the worker actors in one of the following ways:
The router is a very simple example of Actor supervision. Supervision strategies in Akka are an essential component to make the application fault-tolerant [12:9]. A supervisor Actor manages the operations, availability, and life cycle of its children, known as subordinates. The supervision among actors is organized as a hierarchy. Supervision strategies are categorized as follows:
The first model to evaluate is the traditional master-slaves or master-workers design for the computation workflow. In this design, the worker Actors are initialized and managed by the master Actor, which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks is performed through message passing.
The first step in implementing the master-worker design is to define the different classes of messages exchanged between the master and each worker in order to control the execution of the iterative procedure. The implementation of the master-worker design is as follows:
sealed abstract class Message(val i: Int) case class Terminate(i: Int) extends Message(i) case class Start(i: Int =0) extends Message(i) //1 case class Activate(i: Int, x: DblVector) extends Message(i) //2 case class Completed(i: Int, x: DblVector) extends Message(i)//3
Let's define the messages that control the execution of the algorithm. We need at least the following message types or case classes:
Start
: This is sent by the client code to the master to start the computation (line 1
).Activate
: This is sent by the master to the workers to activate the computation. This message contains the time series x
to be processed by the worker Actors. It also contains the reference to sender
(master actor). (line 2
).Completed
: This is sent by each worker back to sender
. It contains the variance of the data in the group (line 3
).The master stops a worker using a PoisonPill
message. The different approaches to terminate an actor are described in the The master actor section.
The hierarchy of the Message
class is sealed to prevent third-party developers from adding another message type. The worker responds to the activate message by executing a data transformation of the ITransform
type. The messages exchanged between master and worker actors are shown in the following diagram:
The worker actors are responsible for transforming each partitioned datasets created by the master Actor, as follows:
type PfnTransform = PartialFunction[DblVector, Try[DblVector]] class Worker(id: Int, fct: PfnTransform) extends Actor { //1 override def receive = { case msg: Activate => //2 sender ! Completed(msg.id+id, fct(msg.xt).get) } }
The Worker
class constructor takes the fct
(the partial function as an argument) (line 1
). The worker launches the processing or transformation of the msg.xt
data on arrival of the Activate
message (line 2
). It returns the Completed
message to the master once the fct
data transformation is completed.
In the Scalability section in Chapter 1, Getting Started, we introduced the concepts of workflow and controller to manage the training and classification process as a sequence of transformation on a time series. Let's define an abstract class for all controller actors, Controller
, with the following three key parameters:
xt
to be processedfct
data transformation implemented as a partial functionnPartitions
to break down a time series for concurrent processingThe Controller
class can be defined as follows:
abstract class Controller ( val xt: DblVector, val fct: PfnTransform, val nPartitions: Int) extends Actor with Monitor { //3 def partition: Iterator[DblVector] = { //4 val sz = (xt.size.toDouble/nPartitions).ceil.toInt xt.grouped(sz) } }
The controller is responsible for splitting the time series into several partitions and assigning each partition to a dedicated worker (line 4
).
Let's define a master actor class Master
. The three methods to override are as follows:
prestart
: This is a method invoked to initialize all the necessary resources such as a file or database connection before the actor executes (line 9
)receive
: This is a partial function that dequeues and processes the messages from the mail boxpostStop
: This cleans up resources such as releasing memory and closing database connections, sockets, or file handles (line 10
)The Master
class can be defined as follows:
abstract class Master( //5 xt: DblVector, fct: PfnTransform, nPartitions: Int) extends Controller(xt, fct, nPartitions) { val aggregator = new Aggregator(nPartitions) //6 val workers = List.tabulate(nPartitions)(n => context.actorOf(Props(new Worker(n, fct)), name = s"worker_$n")) //7 workers.foreach( context.watch ( _ ) ) //8 override def preStart: Unit = /* ... */ //9 override def postStop: Unit = /* ... */ //10 override def receive }
The Master
class has the following parameters (line 5
):
xt
: This is the time series to transformfct
: This is the transformation function nPartitions
: This is the number of partitionsAn aggregating class aggregator
collects and reduces the results from each worker (line 6
):
class Aggregator(partitions: Int) {
val state = new ListBuffer[DblVector]
def += (x: DblVector): Boolean = {
state.append(x)
state.size == partitions
}
def clear: Unit = state.clear
def completed: Boolean = state.size == partitions
}
The worker actors are created through the actorOf
factory method of the ActorSystem
context (line 7
). The worker actors are attached to the context of the master actor, so it can be notified when the workers terminate (line 8
).
The receive
message handler processes only two types of messages: Start
from the client code and Completed
from the workers, as shown in the following code:
override def receive = { case s: Start => start //11 case msg: Completed => //12 if( aggregator += msg.xt) //13 workers.foreach( context.stop(_) ) //14 case Terminated(sender) => //15 if( aggregator.completed ) { context.stop(self) //16 context.system.shutdown } }
The Start
message triggers the partitioning of the input time series into partitions (line 11
):
def start: Unit = workers.zip(partition.toVector) .foreach {case (w, s) => w ! Activate(0,s)} //16
The partitions are then dispatched to each worker with the Activate
message (line 16
).
Each worker sends a Completed
message back to master on the completion of their task (line 12
). The master aggregates the results from each worker (line 13
). Once all the workers have completed their task, they are removed from the master's context (line 14
). The master terminates all the workers through a Terminated
message (line 15
), and finally, terminates itself through a request to its context
to stop it (line 16
).
The previous code snippet uses two different approaches to terminate an actor. There are four different methods of shutting down an actor, as mentioned here:
actorSystem.shutdown
: This method is used by the client to shut down the parent actor systemactor ! PoisonPill
: This method is used by the client to send a poison pill message to the actorcontext.stop(self)
: This method is used by the Actor to shut itself down within its contextcontext.stop(childActorRef)
: This method is used by the Actor to shut itself down through its referenceThe previous design makes sense only if each worker has a unique characteristic that requires direct communication with the master. This is not the case in most applications. The communication and internal management of the worker can be delegated to a router. The implementation of the master routing capabilities is very similar to the previous design, as shown in the following code:
class MasterWithRouter( xt: DblVector, fct: PfnTransform, nPartitions: Int) extends Controller(xt, fct, nPartitions) { val aggregator = new Aggregator(nPartitions) val router = { //17 val routerConfig = RoundRobinRouter(nPartitions, //18 supervisorStrategy = this.supervisorStrategy) context.actorOf( Props(new Worker(0,fct)).withRouter(routerConfig) ) } context.watch(router) override def receive }
The only difference is that the context.actorOf
factory creates an extra actor, router, along with the workers (line 17
). This particular implementation relies on round-robin assignment of the message by the router to each worker (line 18
). Akka supports several routing mechanisms that select a random actor, or the actor with the smallest mailbox, or the first to respond to a broadcast, and so on.
The implementation of the receive
message handler is almost identical to the message handler in the master without routing capabilities, with the exception of the termination of the workers through the router (line 19
):
override def receive = { case Start => start case msg: Completed => if( aggregator += msg.xt) context.stop(router) //19 ... }
The start
message handler has to be modified to broadcast the Activate
message to all the workers through the router:
def start: Unit = partition.toVector.foreach {router ! Activate(0, _)}
Let's select the discrete Fourier transform (DFT) on a time series xt
as our data transformation. We discussed this in the Discrete Fourier transform section in Chapter 3, Data Preprocessing. The testing code is exactly the same, whether the master has routing capabilities or not.
First, let's define a master controller DFTMaster
dedicated to the execution of the distributed discrete Fourier transform, as follows:
type Reducer = List[DblVector] => immutable.Seq[Double] class DFTMaster( xt: DblVector, nPartitions: Int, reducer: Reducer) //20 extends Master(xt, DFT[Double].|>, nPartitions)
The reducer
method aggregates or reduces the results of the discrete Fourier transform (frequencies distribution) from each worker (line 20
). In the case of the discrete Fourier transform, the fReduce
reducer method transposes the list of frequencies distribution and then sums up the amplitude for each frequency (line 21
):
def fReduce(buf: List[DblVector]): immutable.Seq[Double] = buf.transpose.map( _.sum).toSeq //21
Let's take a look at the test code:
val NUM_WORKERS = 4 val NUM_DATAPOINTS = 1000000 val h = (x: Double) =>2.0*Math.cos(Math.PI*0.005*x) + Math.cos(Math.PI*0.05*x) + 0.5*Math.cos(Math.PI*0.2*x) + 0.3* Random.nextDouble //22 val actorSystem = ActorSystem("System") //23 val xt = Vector.tabulate(NUM_DATA_POINTS)(h(_)) val controller = actorSystem.actorOf( Props(new DFTMasterWithRouter(xt, NUM_WORKERS, fReduce)), "MasterWithRouter") //24 controller ! Start(1) //25
The input time series is synthetically generated by the noisy sinusoidal function h
(line 22
). The function h
has three distinct harmonics: 0.005
, 0.05
, and 0.2
, so the results of the transformation can be easily validated. The Actor system, ActorSystem
, is instantiated (line 23
) and the master Actor is generated through the Akka ActorSytem.actorOf
factory (line 24
). The main program sends a Start
message to the master to trigger the distributed computation of the discrete Fourier transform (line 25
).
The action instantiation
Although the scala.actor.Actor
class can be instantiated using the constructor, akka.actor.Actor
is instantiated using an ActorSystem
context, an actorOf
factory, and a Props
configuration object. This second approach has several benefits, including decoupling the deployment of the actor from its functionality and enforcing a default supervisor or parent for the Actor; in this case, ActorSystem
.
The following sequential diagram illustrates the message exchange between the main program, master, and worker Actors:
The purpose of the test is to evaluate the performance of the computation of the discrete Fourier transform using the Akka framework relative to the original implementation, without actors. As with Scala parallel collections, the absolute timing for the transformation depends on the host and the configuration, as shown in the following graph:
The single-threaded version of the discrete Fourier transform is significantly faster than the implementation using the Akka master-worker model with a single worker actor. The cost of partitioning and aggregating (or reducing) the results adds a significant overhead to the execution of the Fourier transform. However, the master worker model is far more efficient with three or more worker actors.
The master-worker implementation has a few problems, which are as follows:
The culprit is the exclusive use of the fire-and-forget mechanism to exchange data between master and workers. The send-and-receive protocol and futures are remedies to these problems.
A future is an object, more specifically a monad, used to retrieve the results of concurrent operations, in a nonblocking fashion. The concept is very similar to a callback supplied to a worker, which invokes it when the task is completed. Futures hold a value that might or might not become available in the future when a task is completed, whether successful or not [12:10].
There are two options to retrieve results from futures:
scala.concurrent.Await
onComplete
, onSuccess
, and onFailure
callback functionsWhich future?
A Scala environment provides developers with two different Future
classes: scala.actor.Future
and scala.concurrent.Future
.
The actor.Future
class is used to write continuation-passing style workflows in which the current actor is blocked until the value of the future is available. Instances of the scala.concurrent.Future
type used in this chapter are the equivalent of java.concurrent.Future
in Scala.
Let's reimplement the normalization of cross-validation groups by their variance, which we introduced in the previous section, using futures to support concurrency. The first step is to import the appropriate classes for execution of the main actor and futures, as follows:
import akka.actor.{Actor, ActorSystem, ActorRef, Props} //26 import akka.util.Timeout //27 import scala.concurrent.{Await, Future} //28
The Actor classes are provided by the akka.actor
package, instead of the scala.actor._
package because of Akka's extended actor model (line 26
). The future-related classes, Future
and Await
, are imported from the scala.concurrent
package, which is similar to the java.concurrent
package (line 28
). The akka.util.Timeout
class is used to specify the maximum duration the actor has to wait for the completion of the futures (line 27
).
There are two options for a parent actor or the main program to manage the futures it creates, which are as follows:
The following design consists of blocking the actor that launches the futures until all the futures have been completed, either returning with a result or throwing an exception. Let's modify the master actor into a TransformFutures
class that manages futures instead of workers or routing actors, as follows:
abstract class TransformFutures( xt: DblVector, fct: PfnTransform, nPartitions: Int) (implicit timeout: Timeout) //29 extends Controller(xt, fct, nPartitions) { override def receive = { case s: Start => compute(transform) //30 } }
The TransformFutures
class requires the same parameters as the Master
actor: a time series, xt
, a data transformation, fct
, and the number of partitions, nPartitions
. The timeout
parameter is an implicit argument of the Await.result
method, and therefore, needs to be declared as an argument (line 29
). The only message, Start
, triggers the computation of the data transformation of each future, and then the aggregation of the results (line 30
). The transform
and compute
methods have the same semantics as those in the master-workers design.
The generic message handler
You may have read or even written examples of actors that have generic case _ =>
handlers in the message loop for debugging purposes. The message loop takes a partial function as an argument. Therefore, no error or exception is thrown if the message type is not recognized. There is no need for such a handler apart from the one for debugging purposes. Message types should inherit from a sealed abstract class or a sealed trait in order to prevent a new message type from being added by mistake.
Let's take a look at the transform
method. Its main purpose is to instantiate, launch, and return an array of futures responsible for the transformation of the partitions, as shown in the following code:
def transform: Array[Future[DblVector]] = { val futures = new Array[Future[DblVector]](nPartitions) //31 partition.zipWithIndex.foreach { case (x, n) => { //32 futures(n) = Future[DblVector] { fct(x).get } //33 }} futures }
An array of futures
(one future per partition) is created (line 31
). The transform
method invokes the partitioning method partition
(line 32
) and then initializes the future with the fct
partial function (line 33
):
def compute(futures: Array[Future[DblVector]]): Seq[Double] = reduce(futures.map(Await.result(_, timeout.duration))) //34
The compute
method invokes a user-defined reduce
function on the futures. The execution of the Actor is blocked until the Await
class' scala.concurrent.Await.result
method (line 34
) returns the result of each future computation. In the case of the discrete Fourier transform, the list of frequencies is transposed before the amplitude of each frequency is summed (line 35
), as follows:
def reduce(data: Array[DblVector]): Seq[Double] = data.view.map(_.toArray) .transpose.map(_.sum) //35 .take(SPECTRUM_WIDTH).toSeq
The following sequential diagram illustrates the blocking design and the activities performed by the Actor and the futures:
Callbacks are an excellent alternative to having the actor blocks on futures, as they can simultaneously execute other functions concurrently with the future execution.
There are two simple ways to implement the callback function, as follows:
Future.onComplete
Future.onSuccess
and Future.onFailure
The onComplete
callback function takes a function of the Try[T] => U
type as an argument with an implicit reference to the execution context, as shown in the following code:
val f: Future[T] = future { execute task } f onComplete {
case Success(s) => { … }
case Failure(e) => { … }
}
You can surely recognize the {Try, Success, Failure}
monad.
An alternative implementation is to invoke the onSuccess
and onFailure
methods that use partial functions as arguments to implement the callbacks, as follows:
f onFailure { case e: Exception => { … } } f onSuccess { case t => { … } }
The only difference between blocking one future data transformation and handling callbacks is the implementation of the compute
method or reducer. The class definition, message handler, and initialization of futures are identical, as shown in the following code:
def compute(futures: Array[Future[DblVector]]): Seq[Double] = { val buffer = new ArrayBuffer[DblVector] futures.foreach( f => { f onSuccess { //36 case data: DblVector => buffer.append(data) } f onFailure { case e: Exception => /* .. */ } //37 }) buffer.find( _.isEmpty).map( _ => reduce(buffer)) //38 }
Each future calls the master actor back with either the result of the data transformation, the onSuccess
message (line 36
), or an exception, the OnFailure
message (line 37
). If every future succeeds, the values of all frequencies for all the partitions are summed (line 38
). The following sequential diagram illustrates the handling of the callback in the master actor:
The execution context
The application of futures requires that the execution context is implicitly provided by the developer. There are three different ways to define the execution context:
import ExecutionContext.Implicits.global
implicit val ec = ExecutionContext.fromExecutorService( … )
val f= Future[T] ={ } (ec)
Let's reuse the discrete Fourier transform. The client code uses the same synthetically created time series as in the master-worker test model. The first step is to create a transform future for the discrete Fourier transform, DFTTransformFuture
, as follows:
class DFTTransformFutures( xt: DblVector, partitions: Int)(implicit timeout: Timeout) extends TransformFutures(xt, DFT[Double].|> , partitions) { override def reduce(data: Array[DblVector]): Seq[Double] = data.map(_.toArray).transpose .map(_.sum).take(SPECTRUM_WIDTH).toSeq }
The only purpose of the DFTTransformFuture
class is to define the reduce
aggregation method for the discrete Fourier transform. Let's reuse the same test case as in the Distributed discrete Fourier transform section under Master-workers:
import akka.pattern.ask val duration = Duration(8000, "millis") implicit val timeout = new Timeout(duration) val master = actorSystem.actorOf( //39 Props(new DFTTransformFutures(xt, NUM_WORKERS)), "DFTTransform") val future = master ? Start(0) //40 Await.result(future, timeout.duration) //41 actorSystem.shutdown //42
The master actor is initialized as of the TransformFutures
type with the input time series xt
, the discrete Fourier transform DFT
, and the number of workers or partitions nPartitions
as arguments (line 39
). The program creates a future instance by sending (ask
) the Start
message to the master (line 40
). The program blocks until the completion of the future (line 41
), and then shuts down the Akka actor system (line 42
).