Topics in This Chapter A2
Writing concurrent applications that work correctly and with high performance is very challenging. The traditional approach, in which concurrent tasks have side effects that mutate shared data, is tedious and error-prone. Scala encourages you to think of a computation in a functional way. A computation yields a value, sometime in the future. As long as the computations don’t have side effects, you can let them run concurrently and combine the results when they become available. In this chapter, you will see how to use the Future
and Promise
traits to organize such computations.
The key points of this chapter are:
A block of code wrapped in a Future { ... }
executes concurrently.
A future succeeds with a result or fails with an exception.
You can wait for a future to complete, but you don’t usually want to.
You can use callbacks to get notified when a future completes, but that gets tedious when chaining callbacks.
Use methods such as map
/flatMap
, or the equivalent for
expressions, to compose futures.
A promise has a future whose value can be set once.
Pick an execution context that is suitable for the concurrent workload of your computation.
The scala.concurrent.Future
object can execute a block of code “in the future.”
import java.time.*
import scala.concurrent.*
given ExecutionContext = ExecutionContext.global
Future {
Thread.sleep(10000)
println(s"This is the future at ${LocalTime.now}")
}
println(s"This is the present at ${LocalTime.now}")
When running this code in the REPL, a line similar to the following is printed:
This is the present at 13:01:19.400
About ten seconds later, a second line appears:
This is the future at 13:01:29.140
When you create a Future
, its code is run on some thread. One could of course create a new thread for each task, but thread creation is not free. It is better to keep some pre-created threads around and use them to execute tasks as needed. A data structure that assigns tasks to threads is usually called a thread pool. In Java, the Executor
interface describes such a data structure. Scala uses the ExecutionContext
trait instead.
Each Future
must be constructed with a reference to an ExecutionContext
. The simplest way is to use this statement, which uses syntax from Chapter 19:
given ExecutionContext = ExecutionContext.global
Then the tasks execute on a global thread pool. This is fine for demos, but in a real program, you should make another choice if your tasks block. See Section 16.9, “Execution Contexts,” on page 258 for more information.
CAUTION
The global execution context runs tasks on daemon threads. The Java virtual machine terminates when only daemon threads are running. In demo code, simply add a call to Thread.sleep
at the end of the main
method to keep the program running until all tasks are finished.
When you construct multiple futures, they can execute concurrently. For example, try running
Future { for i <- 1 to 100 do { print("A"); Thread.sleep(10) } }
Future { for i <- 1 to 100 do { print("B"); Thread.sleep(10) } }
You will get an output that looks somewhat like
ABABABABABABABABABABABABABABA...AABABBBABABABABABABABABBBBBBBBBBBBBBBBB
A future can—and normally will—have a result:
val f = Future {
Thread.sleep(10000)
42
}
When you evaluate f
in the REPL immediately after the definition, you will get this output:
res12: scala.concurrent.Future[Int] = Future(<not completed>)
Wait ten seconds and evaluate f
again:
res13: scala.concurrent.Future[Int] = Future(Success(42))
Alternatively, something bad may happen in the future:
val f2 = Future {
if LocalTime.now.getMinute != 42 then
throw Exception("not a good time")
42
}
Unless the minute happens to be 42, the task terminates with an exception. In the REPL, you will see
res14: scala.concurrent.Future[Int] =
Future(Failure(java.lang.Exception: not a good time))
Now you know what a Future
is. It is an object that will give you a result (or failure) at some point in the future. In the next section, you will see one way of harvesting the result of a Future
.
CAUTION
Do not take the Future(Success(42))
and Future(Failure(...))
outputs literally. These are just how the toString
method formats a future that has completed. If you type Future(Success(42)
into the REPL, you get an instance of Future[Success[Int]]
, which, when completed, is printed as Future(Success(Success(42)))
.
Note
The java.util.concurrent
package has a Future
interface that is much more limited than the Scala Future
trait. A Scala future is equivalent to the CompletionStage
interface in Java.
Tip
The Scala language imposes no restrictions on what you can do in concurrent tasks. However, you should stay away from computations with side effects. It is best if you don’t increment shared counters—even atomic ones. Don’t populate shared maps—even threadsafe ones. Instead, have each future compute a value. Then you can combine the computed values after all contributing futures have completed. That way, each value is only owned by one task at a time, and it is easy to reason about the correctness of the computation.
When you have a Future
, you can use the isCompleted
method to check whether it is completed. But of course you don’t want to wait for completion in a loop.
You can make a blocking call that waits for the result.
import scala.concurrent.duration.*
val f = Future { Thread.sleep(10000); 42 }
val result = Await.result(f, 10.seconds)
The call to Await.result
blocks for ten seconds and then yields the result of the future.
The second parameter of the Await.result
method has type Duration
. Importing scala.concurrent.duration.*
enables conversion methods from integers to Duration
objects, called seconds
, millis
, and so on.
If the task is not ready by the allotted time, the Await.ready
method throws a TimeoutException
.
If the task throws an exception, it is rethrown in the call to Await.result
. To avoid exceptions, you can call Await.ready
and then get the result.
val f2 = Future {
Thread.sleep((10000 * scala.math.random()).toLong)
if scala.math.random() < 0.5 then throw Exception("Not your lucky day")
42
}
val result2 = Await.ready(f2, 5.seconds).value
The Await.ready
method yields its first argument, and the value
method of the Future
class returns an Option[Try[T]]
. It is None
when the future is not completed and Some(t)
when it is is. Here, t
is an object of the Try
class, which holds either the result or the exception that caused the task to fail. You will see how to look inside it in the next section.
Note
In practice, you won’t use the Await.result
or Await.ready
methods much. You run tasks concurrently when they are time-consuming and your program can do something more useful than waiting for the result. Section 16.4, “Callbacks,” on page 250 shows you how you can harvest the results without blocking.
CAUTION
In this section, we used the result
and ready
methods of the Await
object. The Future
trait also has result
and ready
methods, but you should not call them. If the execution context uses a small number of threads (which is the case for the default fork-join pool), you don’t want them all to block. Unlike the Future
methods, the Await
methods notify the execution context so that it can adjust the pooled threads.
Note
Not all exceptions that occur during execution of the future are stored in the result. Virtual machine errors and the InterruptedException
are allowed to propagate in the usual way.
Try
ClassA Try[T]
instance is either a Success(v)
, where v
is a value of type T
or a Failure(ex)
, where ex
is a Throwable
. One way of processing it is with a match
statement.
t match
case Success(v) => println(s"The answer is $v")
case Failure(ex) => println(ex.getMessage)
Alternatively, you can use the isSuccess
or isFailure
methods to find out whether the Try
object represents success or failure. In the case of success, you can obtain the value with the get
method:
if t.isSuccess then println(s"The answer is ${t.get}")
To get the exception in case of failure, first apply the failed
method which turns the failed Try[T]
object into a Try[Throwable]
wrapping the exception. Then call get
to get the exception object.
if t.isFailure then println(t.failed.get.getMessage)
You can also turn a Try
object into an Option
with the toOption
method if you want to pass it on to a method that expects an option. This turns Success
into Some
and Failure
into None
.
To construct a Try
object, call Try(block)
with some block of code. For example,
val t = Try(str.toInt)
is either a Success
object with the parsed integer, or a Failure
wrapping a NumberFormatException
.
There are several methods for composing and transforming Try
objects. However, analogous methods exist for futures, where they are more commonly used. You will see how to work with multiple futures in Section 16.5, “Composing Future Tasks,” on page 251. At the end of that section, you will see how those techniques apply to Try
objects.
As already mentioned, one does not usually use a blocking wait to get the result of a future. For better performance, the future should report its result to a callback function upon completion.
This is easy to arrange with the onComplete
method.
f.onComplete(t => ...)
When the future has completed, either successfully or with a failure, it calls the given function with a Try
object.
You can then react to the success or failure, for example by passing a match function to the onComplete
method.
val f = Future {
Thread.sleep(1000)
if scala.math.random() < 0.5 then throw Exception("Not your lucky day")
42
}
f.onComplete {
case Success(v) => println(s"The answer is $v")
case Failure(ex) => println(ex.getMessage)
}
By using a callback, we avoid blocking. Unfortunately, we now have another problem. In all likelihood, the long computation in one Future
task will be followed by another computation, and another. It is possible to nest callbacks within callbacks, but it is profoundly unpleasant. (This technique is sometimes called “callback hell.”)
A better approach is to think of futures as entities that can be composed, similar to functions. You compose two functions by calling the first one, then passing its result to the second one. In the next section, you will see how to do the same with futures.
Suppose we need to get some information from two web services and then combine the two. Each task is long-running and should be executed in a Future
. It is possible to link them together with callbacks:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
future1 onComplete {
case Success(n1) =>
future2 onComplete {
case Success(n2) => {
val n = n1 + n2
println(s"Sum: $n")
}
case Failure(ex) => ex.printStackTrace()
}
case Failure(ex) => ex.printStackTrace()
}
Even though the callbacks are ordered sequentially, the tasks run concurrently. Each task starts after the Future.apply
method executes or soon afterwards. We don’t know which of future1
and future2
completes first, and it doesn’t matter. We can’t process the result until both tasks complete. Once future1
completes, its completion handler registers a completion handler on future2
. If future2
has already completed, the second handler is called right away. Otherwise, it is called when future2
finally completes.
Even though this nesting of the callbacks works, it looks very messy, and it will look worse with each additional level of processing.
Instead of nesting callbacks, we will use an approach that you already know from working with Scala collections. Think of a Future
as a collection with (hopefully, eventually) one element. You know how to transform the values of a collection—with map
:
val future1 = Future { getData1() }
val combined = future1.map(n1 => n1 + getData2())
Here future1
is a Future[Int]
—a collection of (hopefully, eventually) one value. We map a function Int => Int
and get another Future[Int]
—a collection of (hopefully, eventually) one integer.
But wait—that’s not quite the same as in the callback code. The call to getData2
is running after getData1
, not concurrently. Let’s fix that with a second map
:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
val combined = future1.map(n1 => future2.map(n2 => n1 + n2))
When future1
and future2
have delivered their results, the sum is computed.
Unfortunately, now combined
is a Future[Future[Int]]
, which isn’t so good. That’s what flatMap
is for:
val combined = future1.flatMap(n1 => future2.map(n2 => n1 + n2))
This looks much nicer when you use a for
expression instead of chaining flatMap
and map
:
val combined = for n1 <- future1; n2 <- future2 yield n1 + n2
This is exactly the same code since for
expressions are translated to chains of map
and flatMap
.
What if something goes wrong? The map
and flatMap
implementations take care of all that. As soon as one of the tasks fails, the entire pipeline fails, and the exception is captured. In contrast, when you manually combine callbacks, you have to deal with failure at every step.
You can also apply guards in the for
expression:
val combined =
for n1 <- future1; n2 <- future2 if n1 != n2 yield n1 + n2
If the guard fails, the computation fails with a NoSuchElementException
.
So far, you have seen how to run two tasks concurrently. Sometimes, you need one task to run after another. A Future
starts execution immediately when it is created. To delay the creation, use functions.
val future1 = Future { getData1() }
def future2 = Future { getData2() } // def, not val
val combined = for n1 <- future1; n2 <- future2 yield n1 + n2
Now future2
is only evaluated when future1
has completed.
It doesn’t matter whether you use val
or def
for future1
. If you use def
, its creation is slightly delayed to the start of the for
expression.
This is particularly useful if the second step depends on the output of the first:
def future1 = Future { getData() }
def future2(arg: Int) = Future { getMoreData(arg) }
val combined = for n1 <- readInt("n1"); n2 <- readInt("n2") yield n1 + n2
Note
Like the Future
trait, the Try
class from Section 16.3, “The Try Class,” on page 249 has map
and flatMap
methods. A Try[T]
is a collection of, hopefully, one element. It is just like a Future[T]
, except you don’t have to wait. You can apply map
with a function that changes that one element, or flatMap
if you have Try
-valued function and want to flatten the result. And you can use for
expressions. For example, here is how to compute the sum of two function calls that might fail:
def readInt(prompt: String) = Try(StdIn.readLine(s"$prompt: ").toInt)
val combined =
for n1 <- readInt("n1"); n2 <- readInt("n2") yield n1 + n2
In this way, you can compose Try
-valued computations and you don’t need to deal with the boring part of error handling.
Future
TransformationsThe map
and flatMap
methods that you saw in the preceding section are the most fundamental transformation of Future
objects.
Table 16–1 shows several ways of applying functions to the contents of a future that differ in subtle details.
The foreach
method works exactly like it does for collections, applying a method for its side effect. The method is applied to the single value in the future. It is convenient for harvesting the answer when it materializes.
val combined = for n1 <- future1; n2 <- future2 yield n1 + n2
combined.foreach(n => println(s"Sum: $n"))
Table 16–1 Transformations on a Future[T]
with Success Value v
or Exception ex
Method | Result type | Description |
---|---|---|
|
| Like |
|
| Calls |
|
| Calls |
|
| Calls |
|
| A future with value |
|
| A future with value |
|
| A future with value |
|
| Transforms both the success and failure. |
|
| A future with a pair holding |
|
| Zips both futures and applies |
|
| Flattens a |
The recover
method accepts a partial function that can turn an exception into a successful result. Consider this call:
val f = Future { persist(data) } recover { case e: SQLException => 0 }
If a SQLException
occurs, the future succeeds with result 0
.
The fallbackTo
method provides a different recovery mechanism. When you call f.fallbackTo(f2)
, then f2
is executed if f
fails, and its value becomes the value of the future. However, f2
cannot inspect the reason for the failure.
The failed
method turns a failed Future[T]
into a successful Future[Throwable]
, just like the Try.failed
method. You can retrieve the failure in a for
expression like this:
val f = Future { persist(data) }
for v <- f do println(s"Succeeded with $v")
for ex <- f.failed do println(s"Failed with $ex")
Finally, you can zip two futures together. The call f1.zip(f2)
yields a future whose result is a pair (v, w)
if v
was the result of f1
and w
the result of f2
, or an exception if either f1
or f2
failed. (If both fail, the exception of f1
is reported.)
The zipWith
method is similar, but it takes a function to combine the two results instead of returning a pair. For example, here is another way of obtaining the sum of two computations:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
val combined = future1.zipWith(future2)(_ + _)
Future
ObjectThe Future
companion object contains useful methods for working on collections of futures.
Suppose that, as you are computing a result, you organize the work so that you can concurrently work on different parts. For example, each part might be a subsequence of the inputs. Make a future for each part:
val futures = parts.map(part => Future { process(part) })
Now you have a collection of futures. Often, you want to combine the results. By using the Future.sequence
method, you can get a collection of all results for further processing:
val result = Future.sequence(futures);
Note that the call doesn’t block—it gives you a future to a collection. For example, assume futures
is a Seq[Future[T]]
. Then the result is a Future[Seq[T]]
. When the results for all elements of futures
are available, the result
future will complete with a sequence of the results.
If any of the futures fail, then the resulting future fails as well with the exception of the first failed future. If multiple futures fail, you don’t get to see the remaining failures.
The traverse
method combines the map
and sequence
steps. Instead of
val futures = parts.map(p => Future { process(p) })
val result = Future.sequence(futures);
you can call
val result = Future.traverse(parts)(p => Future { process(p) })
The function in the second curried parameter is applied to each element of parts
. You get a future to a collection of all results.
There are reduceLeft
and foldLeft
operations on iterables of futures. You supply an operation that combines the results of all futures as they become available. For example, here is how you can compute the sum of the results:
val result = Future.reduceLeft(futures)(_ + _)
//
Yields a future to the sum of the results of all futures
So far, we have collected the results from all futures. Suppose you are willing to accept a result from any of the parts. Then call
val result = Future.firstCompletedOf(futures)
You get a future that, when it completes, has the result or failure of the first completed element of futures
.
The find
method produces the leftmost result matching a predicate.
val result = Future.find(futures)(predicate)
//
Yields a Future[Option[T]]
You get a future that, when it completes successfully, yields Some(r)
, where r
is the result of one of the given futures that fulfills the predicate. Failed futures are ignored. If all futures complete but none yields a result that matches the predicate, then find
returns None
. Note that the predicate
parameter has type Option[T]
.
CAUTION
A potential problem with firstCompletedOf
and find
is that the other computations keep on going even when the result has been determined. Scala futures do not have a mechanism for cancellation.
The Future.delegate
method runs a Future
-producing function and flattens the result:
def future1 = Future { getData() } //
Note def, not valval result = Future.delegate(future1) //
A Future[T], not a Future[Future[T]]
Finally, the Future
object provides convenience methods for generating simple futures:
Future.successful(r)
is an already completed future with result r
.
Future.failed(e)
is an already completed future with exception e
.
Future.fromTry(t)
is an already completed future with the result or exception given in the Try
object t
.
Future.unit
is an already completed future with Unit
result.
Future.never
is a future that never completes.
A Future
object is read-only. The result of the future is set implicitly when its task has completed or failed. It cannot be set explicitly.
As a consumer of a Future
, you would never want to set the result. The point of a Future
is to process a result once it is ready.
However, if you produce a Future
for others to consume, the task mechanism only works for synchronous computations. If you use an asynchronous API, you are called back when the result is available. That’s the point where you want to set the result and complete the Future
. You use a Promise
to make that work.
Calling success
on a promise sets the result. Alternatively, you can call failure
with an exception to make the promise fail. As soon as one of these methods is called, the associated future is completed, and neither method can be called again. (An IllegalStateException
is thrown otherwise.)
Here is a typical workflow:
def computeAnswer(arg: String) = {
val p = Promise[String]()
def onSuccess(result: String) = p.success(result)
def onFailure(ex: Throwable) = p.failure(ex)
startAsyncWork(arg, onSuccess, onFailure)
p.future
}
Calling future
on a promise yields the associated Future
object. Note that the method returns the Future
right away, immediately after starting the work that will eventually yield the result.
From the point of view of the consumer (that is, the caller of the computeAnswer
method), there is no difference between a Future
that was constructed with a task function and one that was produced from a Promise
. Either way, the consumer gets the result when it is ready.
The producer, however, has more flexibility when using a Promise
. For example, multiple tasks can work concurrently to fulfill a single promise. When one of the tasks has a result, it calls trySuccess
on the promise. Unlike the success
method, that method accepts the result and returns true
if the promise has not yet completed; otherwise it returns false
and ignores the result.
val p = Promise[String]()
Future {
val result = workHard(arg)
p.trySuccess(result)
}
Future {
val result = workSmart(arg)
p.trySuccess(result)
}
The promise is completed by the first task that manages to produce the result. With this approach, the tasks might want to periodically call p.isCompleted
to check whether they should continue.
Note
Scala promises are very similar to the CompletableFuture
class in Java 8.
The global execution context executes futures on the global fork-join pool. That works well for computationally intensive tasks. However, the fork-join pool only manages a small number of threads (by default, equal to the number of cores of all processors). This is a problem when tasks have to wait, for example when communicating with a remote resource. A program could exhaust all available threads, waiting for results.
You can notify the execution context that you are about to block, by placing the blocking code inside blocking { ... }
:
val f = Future {
val url = "https://horstmann.com/index.html"
blocking {
val contents = Source.fromURL(url).mkString
if contents.length < 300
then contents
else contents.substring(0, 300) + "..."
}
}
The execution context may then increase the number of threads. The fork-join pool does exactly that, but it isn’t designed to perform well for many blocking threads. If you do input/output or connect to databases, you are better off using a different thread pool. The Executors
class from the Java concurrency library gives you several choices. A cached thread pool works well for I/O intensive workloads. You can pass it explicitly to the Future.apply
method, or you can set it as the given execution context:
val pool = Executors.newCachedThreadPool()
given ExecutionContext = ExecutionContext.fromExecutor(pool)
Now this pool is used by all futures where the given
declaration is in scope. (See Chapter 19 for more information about given
declarations.)
1. Consider the expression
for
n1 <- Future { Thread.sleep(1000) ; 2 }
n2 <- Future { Thread.sleep(1000); 40 }
do
println(n1 + n2)
How is the expression translated to map
and flatMap
calls? Are the two futures executed concurrently or one after the other? In which thread does the call to println
occur?
2. Write a function doInOrder
that, given two functions f: T => Future[U]
and g: U => Future[V]
, produces a function T => Future[V]
that, for a given t
, eventually yields g(f(t))
.
3. Repeat the preceding exercise for any sequence of functions of type T => Future[T]
.
4. Write a function doTogether
that, given two functions f: T => Future[U]
and g: T => Future[V]
, produces a function T => Future[(U, V)]
, running the two computations in parallel and, for a given t
, eventually yielding (f(t), g(t))
.
5. Repeat the preceding exercise for any sequence of functions of type T => Future[U]
.
6. Write a function
repeat(action: => T, until: T => Boolean): Future[T]
that asynchronously repeats the action until it produces a value that is accepted by the until
predicate, which should also run asynchronously. Test with a function that reads a password from the console, and a function that simulates a validity check by sleeping for a second and then checking that the password is "secret"
. Hint: Use recursion.
7. Write a program that counts the prime numbers between 1 and n, as reported by BigInt.isProbablePrime
. Divide the interval into p parts, where p is the number of available processors. Count the primes in each part in concurrent futures and combine the results.
8. Write a program that asks the user for a URL, reads the web page at that URL, and displays all the hyperlinks. Provide functions that yield futures for each of these three steps, and then invoke the functions in a for
comprehension.
9. Write a program that asks the user for a URL, reads the web page at that URL, finds all the hyperlinks, visits each of them concurrently, and locates the Server
HTTP header for each of them. Finally, print a table of which servers were found how often. The futures that visit each page should return the header.
10. Change the preceding exercise where the futures that visit each header update a shared Java ConcurrentHashMap
or Scala TrieMap
. This isn’t as easy as it sounds. A threadsafe data structure is safe in the sense that you cannot corrupt its implementation, but you have to make sure that sequences of reads and updates are atomic.
11. In the preceding exercise, you updated a mutable Map[URL, String]
. Consider what happens when two threads concurrently query for the same key whose result is not yet present. Then both threads expend effort computing the same value. Avoid this problem by using a Map[URL, Future[String]]
instead.
12. Using futures, run four tasks that each sleep for ten seconds and then print the current time. If you have a reasonably modern computer, it is very likely that it reports four available processors to the JVM, and the futures should all complete at around the same time. Now repeat with forty tasks. What happens? Why? Replace the execution context with a cached thread pool. What happens now? (Be careful to define the futures after declaring the given execution context.)
13. Using Swing or JavaFX, implement a function that returns a future for a button click. Use a promise to set the value to the button label when the button is clicked. Fail the promise when a timeout has expired.
14. Write a method that, given a URL, locates all hyperlinks, makes a promise for each of them, starts a task in which it will eventually fulfill all promises, and returns a sequence of futures for the promises. Why would it not be a good idea to return a sequence of promises?
15. Use a promise for implementing cancellation. Given a range of big integers, split the range into subranges that you concurrently search for palindromic primes. When such a prime is found, set it as the value of the future. All tasks should periodically check whether the promise is completed, in which case they should terminate.