7.2 Managed blocking

The actor runtime system uses a thread pool, which is initialized to use a relatively small number of worker threads. By default, the number of workers used is twice the number of processor cores available to the JVM. In many cases, this configuration allows executing actors with a maximum degree of parallelism while consuming only a few system resources for the thread pool. In particular, actor programs that use only event-based operations, such as react, can always be executed using a fixed number of worker threads.

However, in some cases, actors use a mix of event-based code and thread-based code. For instance, some methods like receive are implemented using thread-blocking operations. Moreover, actor-based code may have to interoperate with code using the Java's java.util.concurrent concurrency utilities. In both cases, operations that may block the underlying thread have to be used with care, so as to avoid locking up the entire thread pool.

  import scala.actors.Actor._
  import java.util.concurrent.CountDownLatch
  
object PoolLockup {
  def main(args: Array[String]) {     val numCores = Runtime.getRuntime().availableProcessors()     println("available cores: " + numCores)
    val latch = new CountDownLatch(1)     for (i <- 1 to (numCores * 2)) actor {       latch.await()       println("actor " + i + " done")     }
    actor { latch.countDown() }   }
}
Listing 7.6 - Blocked actors may lock up the thread pool.

For example, Listing 7.6 shows what happens if too many actors are blocked simultaneously. To simplify the demonstration, we use an instance of the CountDownLatch class in the java.util.concurrent package. Note that even though the actual code example may not be very useful in and of itself, there are probably places in your actor-based program where the Java concurrency utility classes come in handy. Therefore, the following discussion should be useful to anyone who wants to reuse blocking concurrency code in his or her actor code.

Basically, we use a CountDownLatch to notify a bunch of actors once the "main actor" reaches a certain point. To do this, we initialize the latch to one, and tell our actors to wait until the latch becomes zero. Once the main actor sets the latch to zero, the other actors can continue and print a message before terminating. Now, the problem is that if too many actors wait for the latch to become zero, all worker threads in the underlying thread pool may be blocked, so that there is no thread left to execute the main actor. As a result, the blocked actors wait indefinitely, the thread pool is locked up, and the program fails to terminate. Note that in the example we took care to start twice as many blocking actors as there are processor cores available to the JVM. This corresponds exactly to the number of pool threads created by default. Therefore, starting fewer blocking actors will not cause problems, since there will be a pool thread left to execute the main actor, which releases the blocked actors.

There are several ways to prevent the thread pool from locking up our actor-based program:

  • Configuring the thread pool to create more worker threads on start up
  • Using managed blocking to dynamically resize the thread pool before invoking a blocking operation

The first alternative can be implemented either by using the JVM properties actors.corePoolSize and actors.maxPoolSize (see Chapter 5), or by using a customized scheduler (see Section ???). However, pre-configuring the thread pool size can be fragile if the number of blocking actors is hard to predict. Moreover, overprovisioning of thread pool resources is likely to negatively impact your application's performance.

The second alternative is a much more efficient way of dealing with blocking operations, since the thread pool grows only on demand, and (usually) only for a short period of time. It also avoids the problem of having to predict the maximum number of actors that may be blocked simultaneously. The basic idea of managed blocking is to invoke blocking operations indirectly through an interface that allows the thread pool to resize itself before blocking. Additionally, the interface allows the pool to query a wrapped blocking operation to check whether it no longer needs to block. This enables shrinking the pool back to the size it had before growing to accommodate the blocking operation.

  import scala.actors.Actor
  import scala.actors.Actor._
  import scala.concurrent.ManagedBlocker
  import java.util.concurrent.CountDownLatch
  
object ManagedBlocking {
  class BlockingActor(i: Int, latch: CountDownLatch)         extends Actor {     val blocker = new ManagedBlocker {       def block() = { latch.await(); true }       def isReleasable = { latch.getCount == 0 }     }     def act() {       scheduler.managedBlock(blocker)       println("actor " + i + " done")     }   }
  def main(args: Array[String]) {     val numCores = Runtime.getRuntime().availableProcessors()     println("available cores: " + numCores)
    val latch = new CountDownLatch(1)     for (i <- 1 to (numCores * 2))       (new BlockingActor(i, latch)).start()
    actor { latch.countDown() }   }
}
Listing 7.7 - Using managed blocking to prevent thread-pool lock up.

Listing 7.7 shows how you can use the ManagedBlocker interface to avoid locking up the thread pool. Managed blocking requires the use of methods that are not accessible when defining actors inline using actor { ... }. Therefore, you have to create your blocking actors by subclassing the Actor trait. Note that inside the body of act we replaced the invocation of latch.await() with a call to managedBlock, a method declared in the IScheduler trait. It is invoked on the scheduler instance that is used to execute the current actor (this). managedBlock takes an instance of ManagedBlocker as an argument. You use the ManagedBlocker trait to wrap blocking operations in a way that allows the underlying thread pool to choose when and how to invoke that operation. The trait contains the following two methods:

  • def block(): Boolean
  • def isReleasable: Boolean

The two methods are supposed to be implemented in the following way: The block method invokes a method that possibly blocks the current thread. The underlying thread pool makes sure to invoke block only in a context where blocking is safe; for instance, if there are no idle worker threads left, it first creates an additional thread that can process submitted tasks in the case all other workers are blocked. The Boolean result indicates whether the current thread might still have to block even after the invocation of block has returned. In most cases, it is sufficient to just return true, which indicates that no additional blocking is necessary. The isReleasable method, like block, indicates whether additional blocking is necessary. Unlike block, it should not invoke possibly blocking operations itself. Moreover, it can (and should) return true even if a previous invocation of block returned false, but blocking is no longer necessary.

The implementations of block and isReleasable in Listing 7.7 are straightforward. The block method simply invokes latch.await and returns true after that; clearly, once await has returned no additional blocking is necessary. In our implementation of isReleasable, we use the getCount method of CountDownLatch to determine whether the call to await has already unblocked the thread or not. Running the program extended with managed blocking in this way shows that the pool no longer locks up.

Managed blocking and receive

The receive method allows actors to receive messages in a thread-based way. This means that you can use receive just like any other possibly blocking operation. This is unlike the react method, which is more lightweight, but also more restricted. (Chapter 5 showed how to do event-based programming using react.) Since receive uses standard JVM monitors under the hood, it has the same potential problems as any other blocking code when invoked from within actors. However, since all variants of receive are implemented in objects and types in the scala.actors package, it uses managed blocking internally to avoid thread pool lock-ups. Consequently, there is no need to wrap invocations of receive in ManagedBlockers in user code.


Footnotes for Chapter 7:

[1] Currently this is a mere convention; however, efforts exist to have actor isolation checked using an annotation checker plug-in for the Scala compiler.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset