9.2 Reliable broadcast

When building a distributed application, it is often necessary that more than two actors operate in a coordinated manner. For example, you may want to ensure that all or none of your remote actors carry out some action while providing a way to handle the error case. If a set of remote actors should save their internal state to a database, for instance, then typically you want all or none of them to do it so that there is always a consistent view persisted to the database.

    abstract class BroadcastActor extends Actor {
      // can be set by external actor, therefore @volatile
      @volatile var isBroken = false
      private var canRun = true
      private var counter = 0L
  
    protected def broadcast(m: BSend) = if (!isBroken) {       for (a <- m.recipients) a ! BDeliver(m.data)     } else if (canRun) {       canRun = false // simulate it being broken       for (a <- m.recipients.take(2)) a ! BDeliver(m.data)       println("error at " + this)     }
    // to be overridden in subtraits     protected def reaction: PartialFunction[Any, Unit] = {       case BCast(msg, recipients) =>         counter += 1         broadcast(BSend(msg, recipients, counter))       case 'stop =>         exit()     }
    def act = loopWhile (canRun) { react(reaction) }   }
Listing 9.6 - Best-effort broadcasting.

Basic broadcasting

You can instruct a number of actors to carry out some action by broadcasting a message to these actors. Listing 9.6 shows a simple broadcast implementation. First, we define a BroadcastActor that implements act in a way that allows the actor to react to messages of type BCast and the special 'stop message, which causes the actor to terminate. The BCast case class is defined as follows:

  case class BCast(data: Any, recipients: Set[Actor])

A BCast message tells the actor to send some data to a set of actors specified in the message. Note how the message handlers are defined using the reaction member, which is a partial function that is passed to react inside the act method. This has the advantage that subclasses can override reaction to handle additional message patterns while inheriting some of the message handling logic from the super-trait. The broadcast method implements the actual message sending. broadcast is invoked passing a BSend message that contains the data, the set of recipients, and a time stamp (initially, we will not use the time stamp, though):

  case class BSend(data: Any, recipients: Set[Actor],
      timestamp: Long)

To make things more interesting, we allow an actor to be "broken," which is expressed using the volatile isBroken field (the field is volatile to safely allow changing its value from a different actor). A broken broadcast actor fails to send the message to all of the recipients. The actual data is wrapped in a message of type BDeliver, which is defined as follows:

  case class BDeliver(data: Any)

A BDeliver message indicates to the recipient that the data was delivered using a broadcast.

    class MyActor extends BroadcastActor {
      override def reaction = super.reaction orElse {
        case BDeliver(data) =>
          println("Received broadcast message: " +
                  data + " at " + this)
      }
    }
Listing 9.7 - Using the broadcast implementation in user code.

To use the broadcast implementation in actual user code, we extend the BroadcastActor and override its reaction member, as shown in Listing 9.7. The message handling logic of BroadcastActor must be enabled alongside the new handler for BDeliver messages. To do this, we combine super.reaction with the new handler using orElse. As a result, MyActor instances respond to Broadcast and 'stop messages, as defined in BroadcastActor, in addition to BDeliver messages.

Let's try out this basic broadcast implementation:

  val a1 = new MyActor; a1.start()
  val a2 = new MyActor; a2.start()
  val a3 = new MyActor; a3.start()
  val a4 = new MyActor; a4.start()
  a1 ! Broadcast("Hello!"Set(a1, a2, a3, a4))

As expected, running the above code will produce output like the following:

  Received broadcast message: Hello! at MyActor@3c3c9217
  Received broadcast message: Hello! at MyActor@15af33d6
  Received broadcast message: Hello! at MyActor@54520eb
  Received broadcast message: Hello! at MyActor@2c9b42e6

However, let's try and set actor a1's isBroken field to true and initiate another broadcast:

  a1.isBroken = true
  a1 ! Broadcast("Hello again!"Set(a1, a2, a3, a4))

Then, the output will look differently:

  error at MyActor@15af33d6
  Received broadcast message: Hello again! at MyActor@2c9b42e6

In the above run, only one other actor besides a1 itself received the "Hello again!" message, because a1 failed before sending out more messages. In an actual distributed application, the reason could be a machine failure or a network link that is down. To guarantee that all or no recipients receive the broadcast message, it is necessary to implement a reliable broadcast.

Reliable broadcasting

To make the message broadcasting reliable, we will extend our code to implement an algorithm known as eager reliable broadcast. The idea of this algorithm is that every recipient of a broadcast message should forward that message to every other recipient. Since the forwarding should be done regardless of any failure, broadcasting is eager.

    class RbActor extends BroadcastActor {
      var delivered = Set[BSend]()
      override def reaction = super.reaction orElse {
        case m @ BSend(data, _, _) =>
          if (!delivered.contains(m)) {
            delivered += m
            broadcast(m)
            this ! BDeliver(data)
          }
      }
    }
Listing 9.8 - A reliable broadcast actor.

    protected def broadcast(m: BSend) = if (!isBroken) {
      for (a <- m.recipients) a ! m
    } else if (canRun) {
      canRun = false // simulate it being broken
      for (a <- m.recipients.take(2)) a ! m
      println("error at " + this)
    }
Listing 9.9 - Sending messages with time stamps.

We'll extend BroadcastActor as shown in Listing 9.8. An RbActor keeps track of the BSend messages that it has received using the delivered set. When it receives a BSend, the actor checks whether it has already processed that message by testing the condition delivered.contains(m) is true (since after processing a BSend message, it is added to the delivered set). If the actor receives a fresh BSend message, it'll invoke broadcast to send it to all recipient actors. Moreover, it'll send itself a BDeliver message, indicating that it received the data via a reliable broadcast.

It is crucial here that each BSend message contains a time stamp. The time stamp is set in the BroadcastActor. The time stamp lets us identify to which broadcast a particular BSend message belongs. The messages forwarded by each RbActor do not change that time stamp. This way, each actor knows when it has already received a broadcast message, in which case it does not forward it further. However, for the RbActor to work properly we need to slightly change the implementation of the BroadcastActor. Basically, it is no longer sufficient to send a BDeliver message inside the broadcast method. The reason is that the RbActor needs to receive BSend messages, since only those contain time stamps. Therefore, we have to change the broadcast method accordingly; this is shown in Listing 9.9.

As you can see, the BroadcastActor now simply sends the BSend messages to the recipients (a ! m), instead of sending a BDeliver message that only contains the data (a ! BDeliver(m.data)).

Having made these changes, let's re-run our client code. Before we can do that, however, we first have to change MyActor to extend RbActor instead of BroadcastActor:

  class MyActor extends RbActor {
    override def reaction = super.reaction orElse {
      case BDeliver(data) =>
        println("Received broadcast message: " +
                data + " at " + this)
    }
  }

Running our short test code from above (setting a1.isBroken to true) should now produce output like the following:

  error at MyActor@28e70e30
  Received broadcast message: Hello again! at MyActor@5954864a
  Received broadcast message: Hello again! at MyActor@3c3c9217
  Received broadcast message: Hello again! at MyActor@1ff82982

As you can see, even though actor a1 failed after sending the broadcast message to itself and actor a2, actors a3 and a4 also received the message. The reason is that a2 sent the BSend message it received from a1 to all of its recipients, which includes a3 and a4. In fact, we can prove mathematically that the strategy of eager reliable broadcast will deliver a message to all or no recipients, provided the network communication between actors is based on a reliable transport protocol, such as TCP, which Scala's remote actors use by default.


Footnotes for Chapter 9:

[1] Dean and Ghemawat, "MapReduce: Simplified Data Processing on Large Clusters" Dean-Ghemawat08

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset