10.5 Remote actors in Akka

Like the scala.actors package, Akka supports remote actors—actors that communicate over the network. In fact, Akka's remote actors may well be the main reason you are interested in Akka in the first place. They are feature-rich, powered by an efficient implementation based on Netty (for non-blocking network I/O), and support low-overhead message serialization, for example, using Google protocol buffers.[2]

To introduce Akka's remote actors step-by-step, we'll guide you through the creation of a remote "start" service similar to what you saw in Section ???. However, the following example is self-contained, so you don't have to read Section ??? if you haven't already done so.

Managing a cluster using actors

We will build a small cluster management service that allows a master node to start actors on other nodes in a cluster. Moreover, the cluster service should provide services to the remotely started actors, such as a list of references to cluster services on the neighboring nodes, graceful shutdown, etc.

The cluster service is centered around a master service actor that remotely communicates with a set of cluster service actors. The master actor manages all cluster service actors, broadcasts configuration information to them, and so forth. Each cluster service actor, in turn, manages the remote actors that are running on its local node. For example, to start a new remote actor on a particular node, the master actor sends a control message to the cluster service actor running on the target node; that cluster service actor then takes care of starting a new actor locally, and making it remotely accessible to the master actor.

To make things more concrete, let's look at some code. We'll begin with the master service, shown in Listing 10.6, which first starts when booting up the cluster service. The master service starts Akka's remote support by invoking remote.start (remote is a member of the Actor object), passing the host name and port of the master node. Then, we create a new MasterService actor (we'll discuss its implementation later), using actorOf[MasterService] (we also start the actor right away), and register it on the remote server of the current node, using remote.register. By registering an actor in this way, we can obtain an ActorRef to that actor on any node running Akka's remote support. (We'll use this functionality later for communicating with the master actor from actors running on remote nodes.) After that, we initialize the MasterService actor with the number of cluster nodes to be registered by sending it a message of type ClusterSize. Note that message sends that use the ? operator are synchronous—they do not return until the current actor has received a response. Finally, we use a CountDownLatch to wait until the MasterService completes the initialization, which is the case when all cluster service actors have registered.

    import akka.actor.{ActorActorRef}
    import Actor._
    import java.util.concurrent.CountDownLatch
  
  object MasterService {     val doneInit = new CountDownLatch(1)     private var _master: ActorRef = _
    def master: ActorRef = _master
    def main(args: Array[String]) {       val hostname = args(0)       val port = args(1).toInt       val numNodes = args(2).toInt
      remote.start(hostname, port)
      _master = actorOf[MasterService].start()       remote.register(_master)
      _master ? ClusterSize(numNodes)       doneInit.await()     }   }
Listing 10.6 - The MasterService object.

Listing 10.7 shows the code for MasterService actor. The master service waits for ClusterService actors to register using Announce messages. Each message contains the host name and port number of the registering cluster service. Upon receiving an Announce message, we obtain a remote ActorRef, assigned to nodeRef, to the registering cluster service using one of the actorFor methods of the remote object. We can use the returned ActorRef like any other (local) ActorRef. Next, we add to the nodeRefs map a mapping from the address (hostname and port) of the newly registered node to the nodeRef.

    class MasterService extends Actor {
  
    var numNodes = 0     var nodeRefs: Map[(String, Int), ActorRef] = Map()
    def receive = {
      case ClusterSize(numNodes) =>
        this.numNodes = numNodes         println("[Master] waiting for " + numNodes +                 " nodes to register")         self.reply()
      case Announce(newHost, newPort) =>
        println("[Master] new host " +                 newHost + ":" + newPort)         val nodeRef = remote.actorFor(           classOf[ClusterService].getCanonicalName,           newHost,           newPort)
        nodeRefs += ((newHost, newPort) -> nodeRef)
        if (nodeRefs.size == numNodes) {           println("[Master] all nodes have registered")           nodeRefs.values foreach { service =>             service ? Nodes(nodeRefs.keys.toList)           }           MasterService.doneInit.countDown()         }
      ...     }   }
Listing 10.7 - The MasterService actor.

Finally, after the MasterService has received Announce messages from all cluster service actors (in which case, nodeRefs.size == numNodes), it broadcasts the network addresses of all nodes to the cluster service actors, and signals the end of its initialization by counting down the doneInit latch. This will unblock the main thread that has been executing the body of the MasterService object until the invocation of doneInit.await(), shown in Listing 10.6.

Listing 10.8 shows the ClusterService object. A cluster service instance starts similarly in a way to the master service: the main method of the ClusterService class's companion object sets up Akka's remote support and starts a ClusterService actor, passing it all the information needed to contact the master service.

    object ClusterService {
      val terminate = new CountDownLatch(1)
  
    def run(masterHostname: String,             masterPort: Int,             hostname: String,             port: Int) {       remote.start(hostname, port)
      val service = actorOf[ClusterService].start()       remote.register(service)
      service ! Announce(masterHostname, masterPort)
      terminate.await()       registry.shutdownAll() // also stops service actor       remote.shutdown()     }
    def main(args: Array[String]) {       val masterHostname = args(0)       val masterPort = args(1).toInt       val hostname = args(2)       val port = args(3).toInt       run(masterHostname, masterPort, hostname, port)     }   }
Listing 10.8 - The ClusterService object.

Shutting down. The ClusterService actor's main thread handles termination using a CountDownLatch, which has an initial value of 1. When the ClusterService actor (which handles the actual communication with the master) determines that it should exit, it counts down the terminate latch. As a response to this, the main thread invokes registry.shutdownAll() to stop all actors that have been started on the local node, including the ClusterService actor.[3] Finally, it shuts down the remote service using remote.shutdown(). After that, the main thread of the cluster service node itself exits.

  class ClusterService extends Actor {
    var allAddresses: List[(String, Int)] = List()
    var master: ActorRef = null
  
  def receive = {     case Announce(hostname, port) =>       master = remote.actorFor(         classOf[MasterService].getCanonicalName,         hostname,         port)       val localhost = remote.address.getHostName()       val localport = remote.address.getPort()       master ! Announce(localhost, localport)
    case Nodes(addresses) =>       println("[ClusterService] received node addresses: " +               addresses)       allAddresses = addresses       self.reply()
    case StartActorAt(_, _, clazz) =>       println("[ClusterService] starting instance of " + clazz)       val newActor = actorOf(clazz).start()       remote.register(newActor)       newActor ? Nodes(allAddresses)       self.reply()
    case StopServiceAt(_, _) =>       println("[ClusterService] shutting down...")       ClusterService.terminate.countDown()   } }
Listing 10.9 - The ClusterService actor.

Listing 10.9 shows the ClusterService actor. It responds to several messages, but most interesting is what happens when a StartActorAt message is received. The MasterService uses this message to start an actor on a remote cluster service. This message contains an object of type Class[_ <: Actor], clazz, which is used to create a new Actor instance using actorOf. After registering the new actor on the remote service, the ClusterService actor sends it a message with the network addresses of all cluster nodes (remember that the master service broadcasts these addresses). Finally, it replies to the StartActorAt message to signal that the new actor is ready to receive remote messages.

To start an actor remotely via the StartActorAt message, we also need some functionality in the MasterService actor. One approach is to send StartActorAt messages to the master, in addition to the ClusterService, to instruct the master actor to forward a corresponding message to the target ClusterService. You could do this by adding a pattern-matching case to the MasterService actor's message handler:

  case startMsg @ StartActorAt(host, port, clazz) =>
    nodeRefs((host, port)) ? startMsg
    val startedActor = remote.actorFor(
      clazz.getCanonicalName,
      host,
      port)
    self.reply(startedActor)

The first thing that might look unfamiliar to you in the previous code is the use of the at symbol (@) to prefix the pattern with a chosen identifier (here, startMsg). It works by storing a reference to the message object that matches the pattern in the startMsg variable. We can then use this variable on the right-hand side of the => symbol. This simplifies forwarding the same message unchanged to another actor; in our case, the ClusterService actor returned by nodeRefs((host, port)). After forwarding the StartActorAt message, we use remote.actorFor to obtain an ActorRef to the newly started actor, and send it back as the response.

  class EchoActor extends Actor {
    var neighbors: List[ActorRef] = List()
    var allAddresses: List[(String, Int)] = List()
    var sum = 0
  
  def receive = {     case Nodes(addresses) =>       allAddresses = addresses       neighbors = addresses map { case (hostname, port) =>         remote.actorFor(classOf[ClusterService].getCanonicalName,           hostname,           port)       }       self.reply()
    case any: String =>       println("[EchoActor] received " + any)       // try converting to an Int       sum += any.toInt       println("[EchoActor] current sum: " + sum)   } }
Listing 10.10 - A simple actor that you can start remotely.

Let's test this code. Suppose you'd like to remotely start an instance of the EchoActor class shown in Listing 10.10. This actor responds to two kinds of messages. The first kind of message, with type Nodes, is a message that the cluster service sends right after a new EchoActor instance has been started. As a second kind of message, the actor responds to strings that it converts to integers, which are added to an internal sum.

Starting an EchoActor remotely is pretty simple, given the functionality of the master service:

  // initialize MasterService
  MasterService.main(Array("localhost""9000""1"))
  // remotely start EchoActor
  val response =
    MasterService.master ? StartActorAt("localhost",
                                        9001,
                                        classOf[EchoActor])
  val echoActor = response.get.asInstanceOf[ActorRef]
  echoActor ! "17"

Note that before running the above code you have to start a master service on "localhost:9000" and a cluster service on "localhost:9001". You can terminate the entire "application" as follows:

  MasterService.master ? StopServiceAt("localhost"9001)
  MasterService.shutdown()

In this code, the MasterService instructs a cluster service to shut down by sending it a StopServiceAt message. Upon receiving such a message, the cluster service counts down the terminate latch of its companion object, which causes all actors and the main thread of the cluster service to terminate, as shown in Listing 10.9.

Fault tolerance. So far, our cluster service is pretty bad at handling faults. For example, let's see what happens if we send an invalid message, such as "hello", to an EchoActor that is started remotely. In this case, the message processing code in EchoActor throws an exception because "hello" cannot be converted to an integer. This unhandled exception leads to output similar to the following:

  [ERROR]   [8/5/11 3:29 PM] [akka:event-driven:dispatcher:global-5]
    [LocalActorRef] hello
  java.lang.NumberFormatException: For input string: "hello"
          at java.lang.NumberFormatException.forInputString(NumberFor
    matException.java:48)
          at java.lang.Integer.parseInt(Integer.java:449)
          at java.lang.Integer.parseInt(Integer.java:499)

Moreover, message processing in the EchoActor instance stops. Therefore, your client code can no longer interact with this actor, and may wait for responses indefinitely as a result.

To recover from such unhandled exceptions, Akka provides a powerful mechanism for actor supervision. Basically, some actors can play the role of supervisors that are notified whenever a supervised actor crashes. Additionally, supervisors may define fault-handling strategies for recovery by restarting crashed actors.

You can use actor supervision to recover from crashed remote actors by promoting the ClusterService actor to be a supervisor. You can do this in two steps. First, the cluster service actor must link itself to each remote actor that it starts:

  val newActor = actorOf(clazz)
  // start newActor and link to ClusterService
  self.startLink(newActor)
  remote.register(newActor)
  ...

By linking itself to newActor, the cluster service will be notified when newActor crashes.

Second, the cluster service has to define a fault-handling strategy. You can do this by setting the actor's faultHandler field, for example, in the constructor:

  self.faultHandler =
    OneForOneStrategy(List(classOf[NumberFormatException],
                           classOf[RuntimeException]), 55000)

Akka supports different fault handlers that define different strategies for restarting crashed actors. The simplest strategy restarts the crashed actor (and leaves all other supervised actors alone). The most important parameter of a fault handler is the list of exception types that it handles. In our example, handling NumberFormatException allows you to recover from the case where the EchoActor tries to convert an invalid, non-numeric string to an integer. The other parameters determine the number of restarts that should be attempted, and a timeout within which a restart must be successful (here, 5000 milliseconds).

After setting up the supervisor, you also have to configure the supervised actors—in our case, EchoActor—by defining a life cycle:

  self.lifeCycle = Permanent

The Permanent life cycle configures the supervised actor so that it is always restarted after crashing; the Temporary life cycle configures the supervised actor so that instead of being restarted after crashing, its supervisor terminates it. Moreover, you can clean up the state of the supervised actor by overriding callback methods that are invoked during the termination process.

With this fault-handling mechanism in place, you can recover from the unhandled NumberFormatException caused by a "hello" message:

  // initialize MasterService
  MasterService.main(Array("localhost""9000""1"))
  // remotely start EchoActor
  val response =
    MasterService.master ? StartActorAt("localhost",
                                        9001,
                                        classOf[EchoActor])
  val echoActor = response.get.asInstanceOf[ActorRef]
  // this will lead to an exception in echoActor
  echoActor ! "hello"
  // try again; echoActor is restarted automatically
  echoActor ! "17"

After its crash, the EchoActor resumes message processing, enabling it to handle the 17 message.


Footnotes for Chapter 10:

[1] More precisely, the isDefinedAt method of PartialFunction instances used as message handlers must return true for all messages the actor could receive.

[2] Google protocol buffers provide a language- and platform-neutral way to serialize data. See http://code.google.com/apis/protocolbuffers/.

[3] Akka registers all actors in a global actor registry, which you can access through the Actor object's registry member.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset