Concurrency is one of the toughest topics to handle in modern computer programming; understanding concurrency requires the capacity of thinking abstractly, and debugging concurrent problems is like trying to pilot an airplane by dead reckoning. Even so, with today's Java 7, it has become easier (and more accessible) to write bug-free concurrent code.
Let's start with definitions; concurrency is the ability of a program to execute different (or the same) instructions at the same time. A program that is said to be concurrent has the ability to be split up and run on multiple cpus. By making concurrent programs, you take advantage of today's multicore CPUs. You can even see benefit on single-core CPUs that are I/O intensive.
In this chapter, we present the most common need for concurrency tasks—from running a background task to splitting a computation into work units. Throughout the chapter, you will find the most up-to-date recipes for accomplishing concurrency in Java 7.
You have a task that needs to be run outside of your main thread.
Implement a Runnable interface and start a new Thread. For example:
private void someMethod() {
Thread backgroundThread = new Thread(new Runnable() {
public void run() {
doSomethingInBackground();
}
},"Background Thread");
System.out.println("Start");
backgroundThread.start();
for (int i= 0;i < 10;i++) {
System.out.println(Thread.currentThread().getName()+": is counting "+i);
}
System.out.println("Done");
}
private void doSomethingInBackground() {
System.out.println(Thread.currentThread().getName()+ ": is Running in the background");
}
The Thread
class allows executing code in a new thread (path of execution), distinct from the current thread. The Thread
constructor requires as a parameter a class that implements the Runnable
interface. The Runnable
interface requires the implementation of only one method: public void run().
Then the Thread
's start()
method is invoked. That method will in turn create the new thread and invoke the run()
method of the Runnable
.
Within the JVM are two types of threads: User and Daemon. User threads keep executing until their run()
method finishes, whereas Daemon threads can be terminated if the application needs to exit. An application exits if there are only Daemon threads running in the JVM. When you start to create multithreaded application, you must be aware of these differences and when to use each type of thread.
Usually, Daemon threads will have a Runnable
interface that doesn't finish; for example a while (true)
loop. This allows these threads to periodically check or perform a certain condition through the life of the program and be discarded when the program is done executing. In contrast, User
threads, while alive, will execute and prevent the program from terminating. If you happen to have a program that is not closing and/or exiting when expected, you might want to check the thread types that are actively running (See recipe 9-8 for getting a Thread
dump).
To set a thread as a Daemon thread, use the thread.setDaemon(true)
before calling the thread.start()
method. By default, Thread
instances are created as User
thread types.
Caution This recipe shows the simplest way to create and execute a new thread. The new thread created is a User
thread, which means that the application will not exit until both the main thread and the background thread are done executing.
You need to update a Map
object from multiple threads, and you want to make sure that the update doesn't break the contents of the Map
object and that the Map
object is always in a consistent state. You also want to traverse (look at) the content of the Map
object while other threads are updating the Map
object.
Use a ConcurrentMap
to update Map entries
. The following example creates 1,000 threads. Each thread then tries to modify the Map
at the same time. The main thread then waits for a second, and proceeds to iterate through the Map
(even when the other threads are still modifying the Map
):
ConcurrentMap<Integer,String> concurrentMap =
new ConcurrentHashMap<Integer, String>();
for (int i =0;i < 1000;i++) {
startUpdateThread(i, concurrentMap);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Map.Entry<Integer, String> entry : concurrentMap.entrySet()) {
System.out.println("Key :"+entry.getKey()+" Value:"+entry.getValue());
}
////
private void startUpdateThread(int i, final ConcurrentMap<Integer, String> concurrentMap) {
Thread thread = new Thread(new Runnable() {
public void run() {
while (!Thread.interrupted()) {
int randomInt = random.nextInt(20);
concurrentMap.put(randomInt, UUID.randomUUID().toString());
}
}
});
thread.setName("Update Thread "+i);
updateThreads.add(thread);
thread.start();
}
ConcurrentHashMap
allows for multiple threads to modify the table concurrently and safely. In our example, we have 1,000 threads over a second modifying the Map
. The ConcurrentHashMapiterator
also allows safe iteration over its contents. When using the ConcurrentMap
's iterator, you do not have to worry about locking the contents of the ConcurrentMap
while iterating over it (and it doesn't throw ConcurrentModificationExceptions
).
Note ConcurrentMap
iterators, while thread safe, don't guarantee that you will see entries added/updated after thre iterator was created.
You need to put a key/value pair in a Map
only if the key is not present, and the Map
is being constantly updated by other threads. You need to check for the key's presence first, and you need assurance that some other thread doesn't insert the same key after you check and before you insert yourself.
Using the ConcurrentMap.putIfAbsent()
method, you can be assured that either the map was modified atomically or not. For example, the following code uses the method to check and insert in a single step, thus avoiding the concurrency problem:
private void start() {
ConcurrentMap<Integer, String> concurrentMap = new ConcurrentHashMap<Integer, String>();
for (int i = 0; i < 100; i++) {
startUpdateThread(i, concurrentMap);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Map.Entry<Integer, String> entry : concurrentMap.entrySet()) {
System.out.println("Key :" + entry.getKey() + " Value:" + entry.getValue());
}
}
private void startUpdateThread(final int i,
final ConcurrentMap<Integer, String> concurrentMap) {
Thread thread = new Thread(new Runnable() {
public void run() {
int randomInt = random.nextInt(20);
String previousEntry = concurrentMap.putIfAbsent(randomInt, "Thread # " + i ?
+ " has made it!");
if (previousEntry != null) {
System.out.println("Thread # " + i + " tried to update it but guess
what, we're too late!");
return;
} else {
System.out.println("Thread # " + i + " has made it!");
return;
}
}
});
thread.start();
}
Updating a Map
concurrently is hard because it involves two operations: a check-then-act type of operation. First, the Map
has to be checked to see whether an entry already exists in it. If the entry doesn't exist, you can put the key and the value into the Map
. On the other hand, if the key exists, the value for the key is retrieved. To do so, we use the ConcurrentMap
's putIfAbsent
atomic operation. This ensures that either the key was present and the value is not overwritten, or the key was not present and the value is set. For the JDK implementations of ConcurrentMap
, the putIfAbsent()
method will return null
if there was no value for the key or return the current value if the key has a value. By asserting that the putIfAbsent()
method returns null
, you are assured that the operation was successful and that a new entry in the map has been created.
There are cases when putIfAbsent()
might not be efficient to execute. For example, if the result is a large database query, executing the database query all the time and then invoking putIfAbsent()
will not be efficient. In this kind of scenario, you could first call the map's containsKey()
method to ensure that the key is not present. If it's not present then call the putIfAbsent()
with the expensive database query. There might be a chance that the putIfAbsent()
didn't put the entry, but this type of check reduces the number of potentially expensive value creation.
See the following code snippet:
keyPresent = concurrentMap.containsKey(randomInt);
if (!keyPresent) {
concurrentMap.putIfAbsent(randomInt, "Thread # " + i + " has made it!");
}
In this code, the first operation is to check whether the key is already in the map. If it is, it doesn't execute the putIfAbsent()
operation. If the key is not present, you can proceed to execute the putIfAbsent()
operation.
If you are accessing the values of the map from different threads, you should make sure that the values are threadsafe. This is most evident when using collections as values because they then could be used from different threads. Having the main map threadsafe will prevent concurrent modifications to the map, but once you get access to the values of the map, you should exercise good concurrency practices around the values of the map.
Note ConcurrentMaps
don't allow null
keys, which is different from its non–threadsafe cousin HashMap
(which allows null
keys).
You need to iterate over each element in a collection, but the collection is always being changed by other threads.
By using a CopyOnWriteArrayList
you can safely iterate through the collection without worrying about concurrency. In our solution, the startUpdatingThread()
method creates a new thread, which actively change the List
passed to it. While startUpdatingThread()
modifies the list, you iterate through it concurrently by using a for
loop.
private void copyOnWriteSolution() {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
startUpdatingThread(list);
for (String element : list) {
System.out.println("Element :" + element);
}
stopUpdatingThread();
}
Using a synchronizedList()
allows to atomically change the collection. Also, a synchronizedList()
provides a way to synchronize safely on the list while iterating through it (which is done in the for
loop). For example:
private void synchronizedListSolution() {
final List<String> list = Collections.synchronizedList(new ArrayList<String>());
startUpdatingThread(list);
synchronized (list) {
for (String element : list) {
System.out.println("Element :" + element);
}
}
stopUpdatingThread();
}
Java comes with many concurrent collection options. The selection of the collection to use depends on how the read operations compare with write operations. If writing happens far and in-between compared with reads, using a copyOnWriteArrayList
instance is the most efficient collection to use because it doesn't block (stop other threads) from reading the list and is threadsafe to iterate over (no ConcurrentModificationException
being thrown when iterating through it). If there are the same number of writes and reads, using a SynchronizedList
is the preferred choice.
In solution 1, the CopyOnWriteArrayList
is being updated while you traverse the list. Because the recipe uses the CopyOnWriteArrayList
instance, there is not need to worry of threadsafety when iterating through the collection (as is being done in this recipe by using the for
loop). To note is that the CopyOnWriteArrayList
, offers a snapshot in time when iterating through it. If another thread modifies the list as one is iterating through it, the changes to the modified list will not be visible when iterating.
Caution Locking properly depends on the type of collection used. Anything that comes as a result of using Collections.synchronized
can be locked by using the collection itself (synchronized (collectionInstance)
), but some more efficient (newer) concurrent collections like the ConcurrentMap
cannot be used in this fashion because their internal implementations don't lock in the object itself.
Solution 2 creates a synchronized list, which is created by using the Collections
helper class. The Collection.synchronizedList()
method wraps a List
object (it can be ArrayList
, LinkedList
, or another List
implementor) into a List
that synchronizes the access to the list operations. Every time that you need to iterate over a list (either by using the for-each
statement or using an iterator) you must be aware of the concurrency implications for that list's iterator. The CopyOnWriteArrayList
is safe to iterate over (as specified in the Javadoc), but the synchronizedList
iterator must be synchronized manually (also specified in the Collections.synchronizedlist.list
iterator Javadoc). In the solution, the list can safely be iterated while inside the synchronized(list)
block. When synchronizing on the list, no read/updates/other iterations can happen until the synchronized(list)
block is completed.
You need to modify different but related collections at the same time and want to be sure that no other thread see these structures until they are done being modified.
By synchronizing on the principal collection, you can guarantee that collection can be updated at the same time. In the following example, the fulfillOrder
needs to both check the inventory of the order to be fulfilled, and if there is enough inventory to fulfill the order it needs to add the order to the customersOrders
list. The fulfillOrder()
method synchronizes on the inventoryMap
map and modifies both the inventoryMap
map and the customerOrders
list before finishing the synchronized block.
private boolean fulfillOrder(String itemOrdered, int quantityOrdered, String customerName) {
synchronized (inventoryMap) {
int currentInventory = inventoryMap.get(itemOrdered);
if (currentInventory < quantityOrdered) {
System.out.println("Couldn't fulfill order for "+customerName?
+" not enough "+itemOrdered+" ("+quantityOrdered+")");
return false; // sorry, we sold out
}
inventoryMap.put(itemOrdered,currentInventory - quantityOrdered);
CustomerOrder order = new CustomerOrder(itemOrdered, quantityOrdered, customerName);
customerOrders.add(order);
System.out.println("Order fulfilled for "+customerName+" of "?
+itemOrdered+" ("+quantityOrdered+")");
return true;
}
}
private void checkInventoryLevels() {
synchronized (inventoryMap) {
System.out.println("------------------------------------");
for (Map.Entry<String,Integer> inventoryEntry : inventoryMap.entrySet()) {
System.out.println("Inventory Level :"+
inventoryEntry.getKey()+" "+inventoryEntry.getValue());
}
System.out.println("------------------------------------");
}
}
private void displayOrders() {
synchronized (inventoryMap) {
for (CustomerOrder order : customerOrders) {
System.out.println(order.getQuantityOrdered()+" "+order.getItemOrdered()+" for "+order.getCustomerName());
}
}
}
Using a reentrant lock, you can prevent multiple threads accessing the same critical area of the code. In this solution, the inventoryLock
is acquired by calling inventoryLock.lock()
. Any other thread that tries to acquire the inventoryLock
lock will have to wait until the inventoryLock
lock is released. At the end of the fulfillOrder()
method (in the finally
block), the inventoryLock
is released by calling the inventoryLock.unlock()
method:
Lock inventoryLock = new ReentrantLock();
private boolean fulfillOrder(String itemOrdered, int quantityOrdered,
String customerName) {
try {
inventoryLock.lock();
int currentInventory = inventoryMap.get(itemOrdered);
if (currentInventory < quantityOrdered) {
System.out.println("Couldn't fulfill order for " + customerName +
" not enough " + itemOrdered + " (" + quantityOrdered + ")");
return false; // sorry, we sold out
}
inventoryMap.put(itemOrdered, currentInventory - quantityOrdered);
CustomerOrder order = new CustomerOrder(itemOrdered,
quantityOrdered, customerName);
customerOrders.add(order);
System.out.println("Order fulfilled for " + customerName + " of "
itemOrdered + " (" + quantityOrdered + ")");
return true;
} finally {
inventoryLock.unlock();
}
}
private void checkInventoryLevels() {
try {
inventoryLock.lock();
System.out.println("------------------------------------");
for (Map.Entry<String, Integer> inventoryEntry : inventoryMap.entrySet()) {
System.out.println("Inventory Level :" + inventoryEntry.getKey()
+ " " + inventoryEntry.getValue());
}
System.out.println("------------------------------------");
} finally {
inventoryLock.unlock();
}
}
private void displayOrders() {
try {
inventoryLock.lock();
for (CustomerOrder order : customerOrders) {
System.out.println(order.getQuantityOrdered() +
" " + order.getItemOrdered() + " for " + order.getCustomerName());?
}
} finally {
inventoryLock.unlock();
}
}
If you have different structures that are required to be modified at the same time, you need to make sure that these structures are updated atomically. An atomic operation refers to a set of instructions that can be expected to be executed as a whole or none at all. The atomic operation is visible to the rest of the program only when it is complete.
In solution 1 (atomically modifying both the inventoryMap
map and the customerOrders
list), you pick a “principal” collection on which you will lock (the inventoryMap
). By locking on the principal collection, you guarantee that if another thread tries to lock on the same principal collection, it will have to wait until the lock on the collection is released by the current executing thread.
Note Notice that even though displayOrders
doesn't use the inventoryMap
, you still synchronize on it (in solution 1). Because the inventoryMap
is the main collection, even operations done on secondary collections will still need to be protected by the main collection synchronization.
Solution 2 is more explicit, offering an independent lock that is used to coordinate the atomic operations instead of picking a principal collection. Locking refers to the ability of the JVM to restrict certain code paths to be executed by only one thread. The way locking works is that threads try to get the lock (locks are provided, for example, by a ReentrantLock
instance, as shown in the example). The lock can be given to only one thread at a time. If other threads were trying to acquire the same lock, they will be suspended (WAIT) until the lock is available. The lock becomes available when the thread that currently holds the lock releases it. When a lock is released, it can then be acquired by one (and only one) of the threads that were waiting for that lock.
Locks by default are not “fair.” What that means is that the order of the threads that requested the lock is not kept; this allows for very fast locking/unlocking implementation in the JVM, and in most situations it is generally okay to have unfair locks. On a very highly contended lock, if you really need to evenly distribute the lock (make it fair), you do so by setting the setFair
property on the lock.
In solution 2, calling the inventoryLock.lock()
method will either acquire the lock and continue, or will suspend execution (WAIT) until the lock can be acquired. Once the lock is acquired, no other thread will be able to execute within the locked block. At the end of the block, you release the lock by calling inventoryLock.unlock()
.
It is common practice when working with Lock
objects (ReentrantLock, ReadLock, WriteLock
) to surround the use of these Lock
objects by a try/finally
clause. After opening the try
block, the first instruction would be a call to the lock.lock()
method. This guarantees that the first instruction executed is the acquisition of the lock. The release of the lock (by calling lock.unlock()
) is done in the matching finally
block. Having the lock
be unlocked in the finally
clause allows that, in the event of a RuntimeException
happening while you have acquired the lock, that one doesn't “keep” the lock and prevent other threads to acquire it.
The use of the ReentrantLock
object offers additional features that the synchronized
statement doesn't offer. As an example, the ReentrantLock
has the tryLock()
function, which attempts to get the lock only if no other threads have it (the method doesn't make the invoking thread wait). If another thread holds the lock, the method returns false
but continues executing. It is better to use the synchronized
keyword for synchronization and use ReentrantLock
only when its features are needed. For more information on the other methods provided by the ReentrantLock
, visit http://download.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html
.
Caution While this is only a recipe book, and proper threading techniques span their own volumes, it is important to raise awareness of deadlocks. Deadlocks happen when two locks are involved (and are acquired in reverse order in another thread). The simplest way to avoid deadlock is to not let the lock “escape.” This means that the lock, when acquired, should execute nothing that calls other methods that could possibly acquire a different lock, and if that's not possible, release the lock before calling such a method. See recipe 9-8 for information on finding and troubleshooting deadlocks.
Care should be taken in that any operation that refers to one or both collections needs to be protected by the same lock. Operations that depend on the result of one collection to query the second collection need to be executed atomically; they need to be done as a unit in which neither collection can change until the operation is completed.
You have work that can be split into separate threads and want to maximize the use of available CPU resources.
Use a ThreadpoolExecutor
instance, which allows you to break the tasks into discrete units. In the following example, you create a BlockingQueue
and fill it with Runnable
object (which describe what needs to be done). It then is passed to the ThreadPoolExecutor
instance. The ThreadPoolExecutor
is then initialized, and started by calling the prestartAllCoreThreads()
method and then you wait until all the Runnable
objects are done executing by calling the shutdown()
method, followed by the awaitTermination()
method:
private void start() throws InterruptedException {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
for (int i =0;i < 10;i++) {
final int localI = i;
queue.add(new Runnable() {
public void run() {
doExpensiveOperation(localI);
}
});
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,1000,
TimeUnit.MILLISECONDS, queue);
executor.prestartAllCoreThreads();
executor.shutdown();
executor.awaitTermination(100000,TimeUnit.SECONDS);
System.out.println("Look ma! all operations were completed");
}
A ThreadPoolExecutor
consists of two components: the Queue
of tasks to be executed, and the Executor
, which tells how to execute the tasks. The Queue
is filled with Runnable objects, on which the method run()
contains the code to be executed.
The Queue
used by a ThreadPoolExecutor
is an implementer of the BlockingQueue
interface. The BlockingQueue
interface denotes a queue in which the consumers of the queue will wait (be suspended) if there are no elements in the Queue
. This is necessary for the ThreadPoolExecutor
to work efficiently.
The first step is to fill the Queue
with the tasks that need to be done in parallel. This is done by calling the Queue
's add()
method and passing it a Runnable
interface implementer. Once that's done, the executor is initialized.
The ThreadPoolExecutor
constructor has many options in its constructor; the one used in the solution is the simplest one. Table 8-1 has a description of each parameter:
After the ThreadPoolExecutor
is initialized, you call the prestartAllCoreThreads()
, this method “warms up” the ThreadPoolExecutor
by creating the number of threads specified in the CorePoolSize
and actively starts consuming tasks from the Queue
if it is not empty.
To wait for all the tasks to be completed, you can call the shutdown()
method of the ThreadPoolExecutor.
By calling this method, you instruct the ThreadPoolExecutor
to not accept any new events from the queue (previously submitted events will finish processing). This is the first step in the orderly termination of a ThreadPoolExecutor
. To wait for all the tasks in the ThreadPoolExecutor
to be done, call the awaitTermination()
method. This will put the main thread to wait until all the Runnable
s in the ThreadPoolExecutor's
queue are done executing. After all the Runnable
s are executed, the main thread will wake up and continue.
Note A ThreadPoolExecutor
needs to be configured correctly to maximize CPU usage. The most efficient number of threads for an executor depends on the types of tasks that are submitted. If the tasks are CPU-intensive, having an executor with the current number of cores would be ideal. If the tasks are I/O-intensive, the executor should have more threads than the current number of cores of threads. How many threads an executor should have depends on how intensive the I/O operations are; the more I/O-bound, the higher the number of threads.
Your application requires that two or more threads be coordinated to work in unison.
With wait/notify for thread synchronization you can coordinate threads. In this solution, the main thread waits for the objectToSync
object until the database loading thread is done. Once the database-loading thread is done, it notifies the objectToSync
that whomever is waiting on it can continue executing. The same process happens when loading the orders into our system. The main thread waits on the objectToSync
until the orders loading thread notifies the objectToSync
to continue by calling the objectToSync.notify()
method. After ensuring that both the inventory and the orders are loaded, the main thread executes the processOrder()
method to process all orders.
private final Object objectToSync = new Object();
private void start() {
loadItems();
Thread inventoryThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Inventory from Database...");
loadInventory();
synchronized (objectToSync) {
objectToSync.notify();
}
}
});
synchronized (objectToSync) {
inventoryThread.start();
try {
objectToSync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Thread ordersThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Orders from XML Web service...");
loadOrders();
synchronized (objectToSync) {
objectToSync.notify();
}
}
});
synchronized (objectToSync) {
ordersThread.start();
try {
objectToSync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
processOrders();
}
With a CountDownLatch object, you can control when the main thread continues. In the following code, a countdownLatch
with an initial value of 2
is created; then the two threads for loading the inventory and loading the order information are created and started. As each of the two threads finish executing, they call the CountDownLatch's countDown()
method, which decrements the latch's value by one. The main thread waits until the CountDownLatch reaches 0
, at which point it resumes execution.
CountDownLatch latch = new CountDownLatch(2);
private void start() {
loadItems();
Thread inventoryThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Inventory from Database...");
loadInventory();
latch.countDown();
}
});
inventoryThread.start();
Thread ordersThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Orders from XML Web service...");
loadOrders();
latch.countDown();
}
});
ordersThread.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
processOrders();
}
By using Thread.join()
, you can wait for a thread to finish executing. The following example has a thread for loading the inventory and another thread for loading the orders. Once each thread is started, a call to inventoryThread.join()
will make the main thread wait for the inventoryThread
to finish executing before continuing.
private void start() {
loadItems();
Thread inventoryThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Inventory from Database...");
loadInventory();
}
});
inventoryThread.start();
try {
inventoryThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread ordersThread = new Thread(new Runnable() {
public void run() {
System.out.println("Loading Orders from XML Web service...");
loadOrders();
}
});
ordersThread.start();
try {
ordersThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
processOrders();
}
There are many ways of coordinating threads in Java, and these coordination efforts rely on the notion of making a thread wait. When a thread waits, it suspends execution (it doesn't continue to the next instruction and is removed from the JVM's thread scheduler). If a thread is waiting, it can then be awakened again by notifying it. Within the Java's concurrency lingo, the word notify implies that a thread will stop being in its waiting state and resume execution (the JVM will add the thread to the thread scheduler). So in the natural course of thread coordination, the most common sequence of events is a main thread waiting, and a secondary thread then notifying the main thread to continue (or wake up). Even so, there is the possibility of a waiting thread being interrupted by some other event. When a thread is interrupted, it doesn't continue to the next instruction, but instead throws an InterruptedException
, which is a way of signaling that even though the thread was waiting for something to happen, some other event happened that needs the thread's attention. This is better illustrated by the following example:
BlockingQueue queue = new LinkedBlockingQueue();
while (true) {
synchronized (this) {
Object itemToProcess = queue.take();
processItem (itemToProcess);
}
}
If you look at the previous code, the thread that runs this code would never terminate because it loops forever and waits for an item to be processed. If there are no items in the Queue
, the main thread waits until there is something added to the Queue
from another thread. You couldn't graciously shut down the previous code (especially if the thread running the loop is not a Daemon thread).
BlockingQueue queue = new LinkedBlockingQueue();
while (true) {
synchronized (this) {
Object itemToProcess = null;
try {
itemToProcess = queue.take();
} catch (InterruptedException e) {
return;
}
processItem (itemToProcess);
}
}
The new code has now the ability of “escaping” the infinite loop. From another thread, you can now call thread.interrupt()
; which throws the InterruptedException that is then caught by the main thread's catch clause. Within this clause you can then return, effectively exiting the infinite loop.
InterruptedExceptions
are a way of sending extra information to waiting (or sleeping) threads so that they can handle a different scenario (for example, an orderly program shutdown). For this reason, every operation that changes the state of the thread to sleep/wait will have to be surrounded by a try/catch
block that can catch the InterruptedException
. This is one of the cases in which the exception (InterruptedException
) is not really an error but more of a way of signaling between threads that something has happened that needs your attention.
Solution 1 shows the most common (oldest) form of coordination. The solution requires making a thread wait, suspending execution, until the thread gets notified (or awakened) by another thread.
For solution 1 to work, the originating thread needs to acquire a lock. This lock will then be the “phone number” on which another thread can notify the originating thread to wake up. After the originating thread acquires the lock (phone number), it proceeds to wait. As soon as the wait()
method is called, the lock is released, allowing other threads to acquire the same lock. The secondary thread then proceeds to acquire the lock (the phone number) and then notifies (which, in fact, would be like dialing a wake-up call) the originating thread. After the notification, the originating thread resumes execution.
In the solution 1 code, the lock is a dummy object called objectToSync
. In practice, the object on which you locks for waiting and notifying could be any valid instance object in Java; for example, you could have used the this reference to make the main thread wait (and within the threads you could have used Recipe 8_7_1.this
variable reference to notify the main thread to continue).
The main advantage of using this technique is the explicitness of controlling on whom to wait and when to notify (and the ability to notify all threads that are waiting on the same object; see the following tip).
Tip Multiple threads can wait on the same lock (same phone number to be awakened). When a secondary thread calls notify, it will wake up one of the “waiting” threads (there is no fairness about which is awakened). Sometimes you will need to notify all the threads; you can call the notifyAll()
method instead of calling the notify()
method. This is mostly used if you are preparing many threads to take some work, but the work is not yet done setting up.
Solution 2 uses a more modern approach to notification. It involves a CountDownLatch
. When setting up, you say how many “counts” the latch will have. The main thread will then wait (stop execution) by calling the CountDownLatch
's await()
method until the latch counts down to 0. When the latch reaches 0, the main thread will wake up and continue execution. As the worker thread completes, you call the latch.countdown()
method, which will decrement the latch's current count value. If the latch's current value reaches 0, the main thread that was waiting on the CountDownLatch
will wake up and continue execution.
The main advantage of using CountDownLatches
is that you can spawn multiple tasks at the same time and just wait for all of them to complete (in the solution example, you didn't need to wait until one or the other thread were completed before continuing, they all were started, and when the latch was 0, the main thread continued).
Solution 3 instead offers a solution in which we have access to the thread we want to wait on. For the main thread, it's just a matter of calling the secondary thread's join()
method. Then the main thread will wait (stop executing) until the secondary thread finishes.
The advantage of this method is that it doesn't require the secondary threads to know any synchronization mechanism. As long as the secondary thread terminates execution, the main thread can wait on them.
You need to create an object that is threadsafe because it will be accessed from multiple threads.
Use synchronized getters and setters, and protect critical regions that change state. In the following example, you create an object with getters and setters that are synchronized for each internal variable, and you protect the critical regions by using the synchronized(this) lock:
class CustomerOrder {
private String itemOrdered;
private int quantityOrdered;
private String customerName;
public CustomerOrder() {
}
public double calculateOrderTotal (double price) {
synchronized (this) {
return getQuantityOrdered()*price;
}
}
public synchronized String getItemOrdered() {
return itemOrdered;
}
public synchronized int getQuantityOrdered() {
return quantityOrdered;
}
public synchronized String getCustomerName() {
return customerName;
}
public synchronized void setItemOrdered(String itemOrdered) {
this.itemOrdered = itemOrdered;
}
public synchronized void setQuantityOrdered(int quantityOrdered) {
this.quantityOrdered = quantityOrdered;
}
public synchronized void setCustomerName(String customerName) {
this.customerName = customerName;
}
}
Create an immutable object (an object that, once created, doesn't change its internal state). In the following code, the internal variables to the object are declared final
, and are assigned at construction. By doing so it is guaranteed that the object is immutable:
class ImmutableCustomerOrder {
final private String itemOrdered;
final private int quantityOrdered;
final private String customerName;
ImmutableCustomerOrder(String itemOrdered, int quantityOrdered, String customerName) {
this.itemOrdered = itemOrdered;
this.quantityOrdered = quantityOrdered;
this.customerName = customerName;
}
public String getItemOrdered() {
return itemOrdered;
}
public int getQuantityOrdered() {
return quantityOrdered;
}
public String getCustomerName() {
return customerName;
}
public synchronized double calculateOrderTotal (double price) {
return getQuantityOrdered()*price;
}
}
Solution 1 relies on the principle that any change done to the object is protected by a lock. Using the synchronized
keyword is a shortcut to writing the expression synchronized (this).
By synchronizing your getters and setters (and any other operation that alters the internal state of your object), you guarantee that the object is consistent. Also, it is important that any operations that should occur as a unit (say something that modifies two collections at the same time, as listed in recipe 8-5) are done within a method of the object and are protected by using the synchronized keyword.
For instance, if an object offers a getSize()
method as well as getItemNumber(int index)
, it would be unsafe to write the following object.getItemNumber (object.getSize()-1).
Even though it looks that the statement is concise, another thread can change the contents of the object between getting the size and getting the item number. Instead, it is safer to create a object.getLastElement()
method, which atomically figures out the size and the last element.
Solution 2 relies on the property of immutable objects. Immutable objects don't change their internal state, and objects that don't change their internal state (are immutable) are by definition threadsafe. If you need to modify the immutable object because of an event, instead of explicitly changing its property, create a new object with the changed properties. This new object then takes the place of the old object, and on future requests for the object, the new immutable object is returned. This is by far, the easiest (albeit verbose) method for creating threadsafe code.
You need a counter that is threadsafe so that it can be incremented from different execution threads.
By using the inherently threadsafe Atomic
objects, you can create a counter that guarantees thread safety and has an optimized synchronization strategy. In the following code, you create an Order
object, which requires a unique order ID generated by using the AtomicLong
incrementAndGet()
method:
AtomicLong orderIdGenerator = new AtomicLong(0);
for (int i =0;i < 10;i++) {
Thread orderCreationThread = new Thread(new Runnable() {
public void run() {
for (int i= 0;i < 10;i++) {
createOrder(Thread.currentThread().getName());
}
}
});
orderCreationThread.setName("Order Creation Thread "+i);
orderCreationThread.start();
}
//////////////////////////////////////////////////////
private void createOrder(String name) {
long orderId = orderIdGenerator.incrementAndGet();
Order order = new Order(name, orderId);
orders.add(order);
}
AtomicLong
(and its cousin AtomicInteger
) are built to be used safely in concurrent environments. They have methods to atomically increment (and get) the changed value. Even if two or hundreds of threads were to call the AtomicLong increment()
method, their returned value will always be unique.
If you need to make decisions and update the variables, always use the atomic operations that are offered by the AtomicLong
; for example, compareAndSet
. If not, your code will not be threadsafe (as any check-then-act operation needs to be atomic) unless you externally protect the atomic reference by using your own locks (see recipe 8-7).
The following code illustrates several code safety issues to be aware of. First is that changing a long value may be done in two memory write operations (as allowed by the Java Memory Model), and thus two threads could end up overlapping those two operaitons in what might on the surface appear to be threadsafe code. The result would be a completely unexpected (and likely wrong) long value:
long counter =0;
public long incrementCounter() {
return counter++;
}
This code also suffers from unsafe publication, which refers to the fact that a variable might be cached locally (in the CPU's internal cache) and might not be commited to main memory. If another thread (executing in another CPU) happens to be reading the variable from main memory, that other thread might miss the changes made by the first thread. The changed value might be cached by the first thread's CPU, and not yet committed to main memory where the second thread can see it. For safe publication, you must use the volatile
Java modifier (see http://download.oracle.com/javase/tutorial/essential/concurrency/atomic.html
).
A final issue with the preceding code is that it is not atomic. Even though it looks like there is only one operation to increment the counter, in reality there are two operations that happen at the machine language level (a retrieve of the variable and then an increment). There could be two or more threads that get the same value as they both retrieve the variable but haven't incremented it yet. Then all the threads increment the counter to the same number.
You have an algorithm that benefits from using a divide-and-conquer strategy, which refers to the ability of breaking down a unit of work into two separate subunits and then piecing together the results from these subunits. The subunits can then be broken down into more subunits of work until reaching a point where the work is small enough to just be executed. By breaking down the unit of work into subunits, you can take advantage of the multicore nature of today's processors with minimum pain.
The new Fork/Join framework in Java 7 makes applying the divide-and-conquer strategy straightforward. The following example creates a representation of the Game of Life. The code uses the Fork/Join framework to speed up the calculation of each iteration when advancing from one generation to the next:
////////////////////////////////////////////////////////////////
ForkJoinPool pool = new ForkJoinPool();
long i = 0;
while (shouldRun) {
i++;
final boolean[][] newBoard = new boolean[lifeBoard.length][lifeBoard[0].length];
long startTime = System.nanoTime();
GameOfLifeAdvancer advancer = new GameOfLifeAdvancer(lifeBoard, 0,0, lifeBoard.length-1, lifeBoard[0].length-1,newBoard);
pool.invoke(advancer);
long endTime = System.nanoTime();
if (i % 100 == 0 ) {
System.out.println("Taking "+(endTime-startTime)/1000 + "ms");
}
SwingUtilities.invokeAndWait(new Runnable() {
public void run() {
model.setBoard(newBoard);
lifeTable.repaint();
}
});
lifeBoard = newBoard;
}
////////////////////////////////////////////////////////////////
class GameOfLifeAdvancer extends RecursiveAction{
private boolean[][] originalBoard;
private boolean[][] destinationBoard;
private int startRow;
private int endRow;
private int endCol;
private int startCol;
GameOfLifeAdvancer(boolean[][] originalBoard, int startRow, int startCol, int endRow, int endCol, boolean [][] destinationBoard) {
this.originalBoard = originalBoard;
this.destinationBoard = destinationBoard;
this.startRow = startRow;
this.endRow = endRow;
this.endCol = endCol;
this.startCol = startCol;
}
private void computeDirectly() {
for (int row = startRow; row <= endRow;row++) {
for (int col = startCol; col <= endCol; col++) {
int numberOfNeighbors = getNumberOfNeighbors (row, col);
if (originalBoard[row][col]) {
destinationBoard[row][col] = true;
if (numberOfNeighbors < 2) destinationBoard[row][col] = false;
if (numberOfNeighbors > 3) destinationBoard[row][col] = false;
} else {
destinationBoard[row][col] = false;
if (numberOfNeighbors == 3) destinationBoard[row][col] = true;
}
}
}
}
private int getNumberOfNeighbors(int row, int col) {
int neighborCount = 0;
for (int leftIndex = -1; leftIndex < 2; leftIndex++) {
for (int topIndex = -1; topIndex < 2; topIndex++) {
if ((leftIndex == 0) && (topIndex == 0)) continue; // skip own
int neighbourRowIndex = row + leftIndex;
int neighbourColIndex = col + topIndex;
if (neighbourRowIndex<0) neighbourRowIndex = originalBoard.length + neighbourRowIndex;
if (neighbourColIndex<0) neighbourColIndex = originalBoard[0].length + neighbourColIndex ;
boolean neighbour = originalBoard[neighbourRowIndex % originalBoard.length][neighbourColIndex % originalBoard[0].length];
if (neighbour) neighborCount++;
}
}
return neighborCount;
}
@Override
protected void compute() {
if (getArea() < 20) {
computeDirectly();
return;
}
int halfRows = (endRow - startRow) / 2;
int halfCols = (endCol - startCol) / 2;
if (halfRows > halfCols) {
// split the rows
invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol, startRow+halfRows, endCol,destinationBoard),
new GameOfLifeAdvancer(originalBoard, startRow+halfRows+1, startCol,
endRow, endCol,destinationBoard));
} else {
invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol,
endRow, startCol+ halfCols,destinationBoard),
new GameOfLifeAdvancer(originalBoard, startRow, startCol+halfCols+1, endRow, endCol,destinationBoard));
}
}
private int getArea() { return (endRow - startRow) * (endCol - startCol); }
}
The first part of the example creates a ForkJoinPool
object. The default constructor provides reasonable defaults (such as creating as many threads as there are CPU cores) and sets up an entry point to submit divide-and-conquer work. While the ForkJoinPool
inherits from an ExecutorService
, it is best suited to handle tasks that extend from RecursiveAction
. The ForkJoinPool
object has the invoke(RecursiveAction)
method, which will take a RecursiveAction
object and apply the divide-and-conquer strategy.
The second part of the solution creates the GameOfLifeAdvancer
class, which extends the RecursiveAction
class. By extending the RecursiveAction
class, you can split the work to be done. The GameOfLifeAdvancer
class advances the Game of Life board to the next generation. The constructor takes a two-dimensional boolean
array (which represents a Game of Life board), a start row/column, an end row/column, and a destination two-dimensional boolean
array, on which the result of advancing the Game of Life for one generation is collected.
The GameOfLifeAdvancer
is required to implement the compute()
method. In this method, you figure out how much work there is to be done. If the work is small enough, the work is done directly (achieved by calling the computeDirectly()
method and returning). If the work is not small enough, the method splits the work by creating two GameOfLifeAdvancer
instances that process only half of the current GameOfLifeAdvancer
work. This is done by either splitting the number of rows to be processed into two chunks or by splitting the number of columns into two chunks. The two GameOfLifeAdvancer
instances are then passed to the ForkJoin
pool by calling the invokeAll()
method of the RecursiveAction
class. The invokeAll()
method takes the two instances of GameOfLifeAdvancer
(it can take as many as needed) and waits until they both are finished executing (that is, the meaning of the –all
postfix in the invokeAll()
method name; it waits for all of the tasks submitted to be completed before returning control).
In this way, the GameOfLifeAdvancer
instance is broken down into new GameOfLifeAdvancer
instances that each processes only part of the Game of Life board. Each instance waits for all the subordinate parts to be completed before returning control to the caller. The resulting division of work can take advantage of the multiple CPUs available in the typical system today.
Tip The ForkJoinPool
is generally more efficient than an ExecutorService
because it implements a work-stealing policy. Each thread has a Queue
of work to do; if the Queue
of any thread gets empty, the thread will “steal” work from another thread queue, making a more efficient use of CPU processing power.