7.1 Pluggable schedulers

In some cases, you must customize the way in which actors are executed, including when:

  • Maintaining thread-bound properties such as ThreadLocals
  • Interfacing with existing event dispatch threads
  • Using daemon-style actors
  • Testing with deterministic execution of message sends/receives for reproducible testing
  • Maintaining fine-grained control over resources consumed by the underlying thread pool

The part of the runtime system that executes an actor's behavior is called a scheduler. Each actor is associated with a scheduler object that executes the actor's actions; that is, its body as well as its reactions to received messages. By default, a global scheduler executes all actors on a single thread pool. However, in principle each actor may be executed by its own scheduler.

To customize an actor's scheduler, you override the scheduler method inherited from the Reactor trait. The method returns an instance of trait IScheduler, which is used to execute the actor's actions. By returning a custom IScheduler instance, the default execution mechanism can be overridden. In the following sections, we will show you how to do this for each case listed above.

Maintaining thread-bound properties

When an application is run on the JVM, certain properties are maintained on a by-thread basis. Examples for such properties include the context class loader, the access control context, and programmer-defined ThreadLocals. In applications that use actors instead of threads, these properties are still useful or maybe even necessary to interoperate with JVM-based libraries and frameworks.

Using ThreadLocals or other thread-bound properties is done the same way as threads when using thread-based actors. For event-based actors, the situation is slightly more complicated since the underlying thread that is executing a single event-based actor may change over time. Remember that each time an actor suspends in a react, the underlying thread is released. When this actor resumes, it may be executed by a different thread. Thus, without some additional logic, ThreadLocals could change unexpectedly during an event-based actor's execution, which would be confusing. In the next section, we show you how to correctly maintain thread-bound properties, such as ThreadLocals, over the event-based actor's lifetime.

Example: thread-local variables

Listing 7.1 shows an attempt to use a ThreadLocal to track a name associated with the current actor. The name is stored in a ThreadLocal[String] called tname. In Java, ThreadLocals are typically declared as static class members, since they hold data that is not specific to a class instance but to an entire thread. In Scala, there are no static class members. Instead, data that is not specific to a class instance is held in singleton objects. Object members are translated to static class members in the JVM bytecode. Therefore, we declare the thread-local tname as a member of the application's object, as opposed to a member of a class or trait. We override the initialValue method to provide the initial value "john". The joeActor responds to the first 'YourName request by first setting its name to "joe jr.", and then sending it back in a reply to the sender. Upon the second 'YourName request, the thread-local name is sent back unchanged as a reply. The other actor simply sends two requests and prints their responses. We expect the program to produce the following output:

  your name: joe jr.
  your name: joe jr.

However, surprisingly, some executions produce the following output:

  your name: joe jr.
  your name: john

Apparently, in this execution the second 'YourName request returns the initial value of the ThreadLocal, even though it has been set previously by the actor to a different value. As already mentioned, the underlying problem is that parts of an event-based actor are not always executed by the same underlying thread. After resuming the second react, the actor can be executed by a thread that is different from the thread that executed the reaction to the first request. In that case, the ThreadLocal containing the initial value has not been updated yet.

    object ActorWithThreadLocalWrong extends Application {
      val tname = new ThreadLocal[String] {
        override protected def initialValue() = "john"
      }
      val joeActor = actor {
        react { case 'YourName =>
          tname set "joe jr."
          sender ! tname.get
          react { case 'YourName =>
            sender ! tname.get
          }
        }
      }
      actor {
        println("your name: " + (joeActor !? 'YourName))
        println("your name: " + (joeActor !? 'YourName))
      }
    }
Listing 7.1 - Incorrect use of ThreadLocal.

  abstract class ActorWithThreadLocal(private var name: String)
    extends Actor {
    override val scheduler = new SchedulerAdapter {
      def execute(codeBlock: => Unit): Unit =
        ActorWithThreadLocal.super.scheduler execute {
          tname set name
          codeBlock
          name = tname.get
        }
    }
  }
Listing 7.2 - Saving and restoring a ThreadLocal.

We can avoid the above problem as follows: First, we create a subclass of the Actor trait that stores a copy of the thread-local variable. The idea is to restore the actual ThreadLocal using this copy whenever the actor resumes. Conversely, we save the current value of the ThreadLocal to the actor's copy whenever the actor suspends. This way, we make sure that the ThreadLocal holds the correct value while the actor executes, namely the value associated with the current actor.

The solution that we just outlined requires us to run custom code upon an actor's suspension and resumption. We can achieve this by overriding the scheduler that is used to execute the actor. However, we only want to override a specific method of the scheduler, namely the method that receives the code to be executed after an actor resumes and before it suspends. Since this is the most common use case when overriding an actor's scheduler, the helper trait SchedulerAdapter, allows us to override only this required method.

This implementation using SchedulerAdapter is shown in Listing 7.2. The abstract ActorWithThreadLocal class overrides the scheduler member with a new instance of a SchedulerAdapter subclass. This subclass implements the execute method that receives the code block, which is executed after this actor resumes and before it suspends. To insert the required code, we invoke the execute method of the inherited scheduler, passing a closure that surrounds the evaluation of the by-name codeBlock argument with additional code. Before the actor resumes, we restore the thread-local tname with the value of the actor's copy in its private name member. Normally, after running the code block, the actor would suspend. In our extended closure, we additionally save the current value of tname in the actor's name member before we suspend. By making joeActor in Listing 7.1 an instance of ActorWithThreadLocal, its thread-local state is managed correctly.

Interfacing with event dispatch threads

Some frameworks restrict certain actions to special threads that the framework manages. For example, a special event dispatch thread manages the event queue of Java's Swing class library. For thread safety, Swing UI components may only be accessed inside event handlers that this dispatch thread executes. Therefore, an actor that wants to interact with Swing components must run on the event dispatch thread. This is just one example where you need to "bind" actors to specific threads that are provided by some framework. Another example is a library that interacts with native code through JNI, where all accesses must be performed by a single JVM thread.

By overriding an actor's scheduler, we can ensure that its actions are executed on a specific thread, instead of an arbitrary worker thread of the actor runtime system. For this, we can again use the SchedulerAdapter trait, which we showed in the previous section. Listing 7.3 shows the implementation of an Actor subclass that executes its instances only on the Swing event dispatch thread. For this, we override the scheduler member with a new instance of a SchedulerAdapter that executes the actor's actions by submitting Runnables to the Swing event dispatch thread. We do this using the invokeLater method of java.awt.EventQueue.

    abstract class SwingActor extends Actor {
      override val scheduler = new SchedulerAdapter {
        def execute(codeBlock: => Unit): Unit =
          java.awt.EventQueue.invokeLater(
            new Runnable() {
              def run() = codeBlock
            }
          )
      }
    }
Listing 7.3 - Executing actors on the Swing event dispatch thread.

Daemon-style actors

In many cases, we don't have to care about the termination of an actor-based program. When all actors have finished their execution, the program terminates. However, when actors are long-running or react to messages inside an infinite loop, orderly termination of actors and the underlying thread pool can become challenging.

Some applications use actors that are always ready to accept requests to process work in the background. To simplify termination in such cases, it can help to make those actors daemons: the existence of active daemon actors does not prevent the main program from terminating. This means that as soon as all non-daemon actors have terminated, the application terminates.

    import scala.actors.Actor
    import scala.actors.scheduler.DaemonScheduler
  
  object DaemonActors {
    class MyDaemon extends Actor {       override def scheduler = DaemonScheduler       def act() {         loop {           react { case num: Int => reply(num + 1) }         }       }     }
    def main(args: Array[String]) {       val d = (new MyDaemon).start()       println(d !? 41)     }   }
Listing 7.4 - Creating daemon-style actors.

Listing 7.4 shows how to create actors with daemon-style semantics: simply override the scheduler method to return the DaemonScheduler object. DaemonScheduler uses the exact same configuration as the default Scheduler object, except that the actors that it manages do not prevent the application from terminating. In Listing 7.4 the d actor is again waiting for a message after the synchronous send has been served. However, since the main thread finishes, the DaemonScheduler also terminates, and with it the d actor.

Deterministic actor execution

By default, the execution of concurrent actors is not deterministic. This means that two actors that are ready to react to a received message may be executed in any order, or, depending on the number of available processor cores, in parallel. Since actors do not share state,[1] you do not have to worry about data races even if you don't know the actual execution order in advance. In fact, for best performance and scalability we would like to have as many actors as possible executed in parallel!

However, in some cases it can be helpful to execute actors deterministically. The main reason is that a deterministic execution enables reproducing program executions that are not influenced by timing-dependent variations in thread scheduling, which make multi-threaded programs extremely hard to test.

  case class SyncGear(s: Int)
  case class SyncDone(g: Gear)
  
class Gear(val id: Intvar speed: Intval controller: Actor)     extends Actor {
  def act() {     loop {       react {         case SyncGear(targetSpeed: Int) =>           println("[Gear "+id+                   "] synchronize from current speed " + speed +                   " to target speed " + targetSpeed)           adjustSpeedTo(targetSpeed)       }     }   }
  def adjustSpeedTo(targetSpeed: Int) {     if (targetSpeed > speed) {       speed += 1       self ! SyncGear(targetSpeed)     } else if (targetSpeed < speed) {       speed -= 1       self ! SyncGear(targetSpeed)     } else if (targetSpeed == speed) {       println("[Gear " + id + "] has target speed")       controller ! SyncDone(this)       exit()     }   } }
Listing 7.5 - Synchronizing the speed of Gear actors.

For example, consider a concurrent application simulating mechanical gears and motors. Assume that each gear's speed is adjusted using a controller. To model the real-world concurrency, we represent each gear and the controller as an actor. Listing 7.5 shows the implementation of a gear as an actor. The Gear actor responds to SyncGear messages that cause the gear to adjust its speed in a step-wise manner. For instance, a gear with current speed 7 units requires two steps to adjust its speed to 5 units. A SyncGear message initiates each step. To process each message, the gear decrements its speed by a fixed amount and sends itself another SyncGear message if it has not reached its target speed. Otherwise, the gear reports to its controller that it has reached the target speed using a SyncDone message. Let's write a little driver to test the Gear actor:

  object NonDeterministicGears {
    def main(args: Array[String]) {
      actor {
        val g1 = (new Gear(17, self)).start()
        val g2 = (new Gear(21, self)).start()
        g1 ! SyncGear(5)
        g2 ! SyncGear(5)
        react { case SyncDone(_) =>
          react { case SyncDone(_) => }
        }
      }
    }
  }

The above driver creates two gears that, initially, are running at speeds 7 and 1, respectively. Afterward the controller actor instructs the gears to adjust their speed to 5 by sending asynchronous SyncGear messages. Finally, it waits until both gears have synchronized their speeds. Running the driver produces output such as the following:

  [Gear 2] synchronize from current speed 1 to target speed 5
  [Gear 2] synchronize from current speed 2 to target speed 5
  [Gear 1] synchronize from current speed 7 to target speed 5
  [Gear 1] synchronize from current speed 6 to target speed 5
  [Gear 1] synchronize from current speed 5 to target speed 5
  [Gear 2] synchronize from current speed 3 to target speed 5
  [Gear 1] has target speed
  [Gear 2] synchronize from current speed 4 to target speed 5
  [Gear 2] synchronize from current speed 5 to target speed 5
  [Gear 2] has target speed

As you can see, the speed-adjusting steps of the different gears are interleaved, since the gear actors are running concurrently. A subsequent program run may produce an entirely different interleaving of the steps. However, this means that you could not use the above driver for a unit test that compares the actual output to some expected output.

Using the SingleThreadedScheduler class (which resides in package scala.actors.scheduler), you can make the execution of concurrent actors deterministic. As its name suggests, this scheduler runs the behavior of all actors on a single thread. Since that thread's execution is deterministic, the entire actor system executes deterministically. In particular, any side effects that actors might have as part of their reaction to messages, such as I/O, is done in the same order in all program runs.

Usually, running actors on a single thread means that the reaction of an actor receiving a message is immediately executed on the same thread that has been executing that message's sender. Since it is a valid pattern to have an actor sending messages to itself in a loop, sometimes the scheduler must delay the processing of a message to avoid a stack overflow. For such situations, the scheduler maintains a queue of reactions that are executed when there is nothing else left to do. However, some reactions may remain in the scheduler's queue just before the application should terminate. Therefore, to make sure that the scheduler processes all tasks, you have to invoke shutdown explicitly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset