5.3 Event-based futures

In Chapter 4, we illustrated how to use futures for result-bearing messages. Some of the methods used to wait for the result of a future rely on the thread-based receive under the hood. While waiting for the result, those methods monopolize the underlying worker thread. We can also wait for a future in an event-based way with react.

For example, suppose we want to render a summary of all images linked from a web page at a given URL. We can render each image individually once the image has finished downloading. To increase the application's throughput, each image is downloaded by its own actor. Since each downloading actor performs a result-bearing task, it is convenient to use futures to keep track of the expected results. Listing 5.9 shows the code for rendering images in this way.

    def renderImages(url: String) {
      val imageInfos = scanForImageInfo(url)
      val dataFutures = for (info <- imageInfos) yield {
        val loader = actor {
          react { case Download(info) =>
            reply(info.downloadImage())
          }
        }
        loader !! Download(info)
      }
      for (i <- 0 until imageInfos.size) {
        dataFutures(i)() match {
          case data @ ImageData(_) => 
            renderImage(data)
        }
      }
      println("OK, all images rendered.")
    }
Listing 5.9 - Image renderer using futures.

First, the renderImages method scans the URL, provided as a parameter, for image information. For each image, we start a new actor that downloads the image and replies with image data. We obtain a future using the !! message send variant. Once all the futures have been collected in dataFutures, the current actor waits for each future by invoking the future's apply method.[2]

Example: image renderer with react and futures

In the implementation we just described, the underlying thread blocks while waiting for a future. However, we can also wait for a future in a non-blocking, event-based way using react. The key for this to work is the InputChannel associated with each Future instance. We use this channel to transmit the result of the future to the actor that created the future. Invoking a future's apply method waits to receive the result on that channel, using the thread-based receive. However, we can also wait for the results in an event-based way using react on the future's InputChannel.

Listing 5.10 shows an implementation that does just that. Since we need to invoke react several times in sequence, you have to use one of the control-flow combinators of Section 5.2. In this example, we use loopWhile to emulate the indexing scheme of the previous version in Listing 5.9. The main difference is that in this implementation the index variable i is declared and incremented explicitly, and the generator in the for-expresssion has been replaced with a termination condition.

    def renderImages(url: String) {
      val imageInfos = scanForImageInfo(url)
      val dataFutures = for (info <- imageInfos) yield {
        val loader = actor {
          react { case Download(info) =>
            reply(info.downloadImage())
          }
        }
        loader !! Download(info)
      }
      var i = 0
      loopWhile (i < imageInfos.size) {
        i += 1
        dataFutures(i-1).inputChannel.react {
          case data @ ImageData(_) => renderImage(data)
        }
      } andThen { println("OK, all images rendered.") }
    }
Listing 5.10 - Using react to wait for futures.

You can also build custom control-flow combinators that allow you to use react inside for-expresssions. In the following section, we explain how you can do this.

    def renderImages(url: String) {
      val imageInfos = scanForImageInfo(url)
      val dataFutures = for (info <- imageInfos) yield {
        val loader = actor {
          react { case Download(info) =>
            reply(info.downloadImage())
          }
        }
        loader !! Download(info)
      }
      {
        for (ft <- ForEach(dataFutures)) {
          ft.inputChannel.react {
            case data @ ImageData(_) => renderImage(data)
          }
        } 
      } andThen {
        println("OK, all images rendered.")
      }
    }
Listing 5.11 - Enabling react in for expressions.

    case class ForEach[T](iter: Iterable[T]) {
      def foreach(fun: T => Unit): Unit = {
        val it = iter.elements
        loopWhile (it.hasNext) {
          fun(it.next)
        }
      }
    }
Listing 5.12 - Implementing the custom ForEach operator.

Building custom control-flow operators

Sometimes the existing control-flow combinators that the Actor object provides are not well-suited for the task at hand. In such cases, building custom control-flow operators can help. In this section, you will learn how you can use the control-flow combinators that the Actor object provides to build custom operators that let you use react (and methods using react) inside for expressions.

Listing 5.11 illustrates how to use a custom ForEach operator that allows you to iterate over a list while invoking react for each element in the list. In this case, we want to iterate over the futures in dataFutures. We use ForEach to convert the plain dataFutures list into an object that acts as a generator in for expressions. It generates the same values as the dataFutures list, namely all of the list's elements. However, it does so in a way that allows continuing the iteration even after react is invoked inside the body of the for expressions.

Listing 5.12 shows the implementation of ForEach. Making ForEach a case class allows you to omit new when creating new instances. The constructor takes a parameter of type Iterable[T]—the collection that generates the elements for our iteration.

The ForEach class has a single method foreach that takes a parameter of function type T => Unit. Implementing the foreach method enables instances of the ForEach class to be used as generators in simple for expressions, like the one in Listing 5.11. The variable that is bound to the generated elements in the for expression corresponds to the parameter of the function fun. The body of the for expression corresponds to the body of fun.

Inside foreach, we first obtain an iterator, it, from the Iterable. Then, we iterate over the collection using it and the loopWhile combinator introduced in Section 5.2. In each iteration, we apply the parameter function fun to the current element of the collection. Since we are using loopWhile, it is safe to invoke react inside fun.


Footnotes for Chapter 5:

[1] In practice, waiting is slightly more complicated, because threads may be interrupted during waiting.

[2] Scala provides a shorthand syntax for invoking apply methods. To invoke an apply method, it suffices to add the parameter list directly after the receiver. For example, fut() is equivalent to fut.apply().

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset