Chapter 22

imageMultithreaded Programming with C++

WHAT’S IN THIS CHAPTER?

  • What multithreaded programming is and how to write multithreaded code
  • What deadlocks and race conditions are, and how to use mutual exclusion to prevent them
  • How to use atomic types and atomic operations
  • What thread pools are

Multithreaded programming is important on computer systems with multiple processors. It allows you to write a program to utilize all those processors in parallel. Systems with multiple processors already exist for a long time; however, they were rarely used in consumer systems. Today, all major CPU vendors are selling multicore processors. A multicore processor physically looks like a single processor, but inside it contains several parallel CPU processors, also called cores. Nowadays, multicore processors are being used for everything from servers to consumer computers. Even the latest smartphones have multicore processors. Because of this proliferation of multicore processors, writing multithreaded applications is becoming more and more important. A Professional C++ programmer needs to know how to write correct multithreaded code to take full advantage of all the available processors and cores. Writing multithreaded applications used to rely on platform- and operating system-specific APIs. This makes it difficult to write platform independent multithreaded code. C++11 solves this problem with the introduction of a standard threading library.

Multithreaded programming is a complicated subject. This chapter introduces you to multithreaded programming using the new C++11 library, but it cannot go into all details due to space constraints. There are entire books written about developing multithreaded programs. If you are interested in more details, consult one of the references in the multithreaded section in Appendix B.

If your compiler does not yet support the C++11 threading library, you might use other third-party libraries available that try to make multithreaded programming more platform independent. There are, for example, the pthreads library and the boost::thread library. However, since they are not part of the C++ standard, these are not discussed in this book.

INTRODUCTION

Multithreaded programming allows you to perform multiple calculations in parallel. This way you can take advantage of the multiple processors and cores inside most CPUs these days. Years ago, the CPU market was racing for the highest frequency, which is perfect for single threaded applications. This race has stopped due to a combination of power management and heat management problems. Today the CPU market is racing toward the most cores on a single CPU. Dual- and quad-core CPUs are already common at the time of this writing, and announcements have already been made about 12-, 16-, 32-, and even 80-core CPUs. They are still in an experimental stage, but you can be sure they will come out sooner than you might expect.

Similarly, if you look at the processing units on graphics cards, called GPUs, you’ll see that they are massively parallel computing units. Today, high-end graphics cards already have more than 2,000 cores and this will only increase rapidly. These graphics cards are not only used for gaming anymore, but also to perform computationally intense tasks. Examples are image and video manipulation, protein folding (useful for discovering new drugs), processing signals as part of the SETI project (Search for Extra-Terrestrial Intelligence), and so on.

C++98/03 did not have support for multithreaded programming, and you had to resort to third-party libraries or to the multithreading APIs of your target operating system. C++11 now includes a multithreading library, making it easier to write cross-platform multithreaded applications. There are two reasons to start writing multithreaded code. First, if you have a computational problem and you manage to separate it into small pieces that can be run in parallel independently from each other, you can expect a huge performance boost running it on modern multi-core GPUs and CPUs. Second, you can modularize computations along orthogonal axes; for example, doing long computations in a thread instead of blocking the GUI thread, so the user interface remains responsive while a long computation occurs in the background.

Figure 22-1 shows an example of a problem perfectly suited to run in parallel. An example could be the processing of pixels of an image by an algorithm that does not require information about neighboring pixels. The algorithm could split the image into four parts. On a single-core CPU, each part is processed sequentially; on a dual-core CPU, two parts are processed in parallel; and on a quad-core CPU, four parts are processed in parallel resulting in an almost linear scaling of the performance with the number of cores.

Of course, it’s not always possible to split the problem into parts that can be executed independently of each other in parallel. But often it can be parallelized at least partially, resulting in a performance increase.

A difficult part in multithreaded programming is to parallelize your algorithm, which is highly dependent on the type of your algorithm. Other difficult parts are preventing race conditions and deadlocks. These will be discussed in the following section.

Race Conditions and Deadlocks

Race conditions can occur when multiple threads want to read/write to a shared memory location. For example, suppose you have a shared variable and one thread increments this value while another thread decrements it. Incrementing and decrementing the value means that the current value needs to be retrieved from memory, incremented or decremented, and stored back in memory. On older architectures, for example PDP-11 and VAX, this used to be implemented with an INC processor instruction which was atomic. On multicore x86 processors, the INC instruction is not atomic anymore, meaning that the CPU could execute other instructions in the middle of this operation, which might cause the code to retrieve a wrong value.

The following table shows the result when the increment is finished executing before the decrement starts, and assumes that the initial value is 1:

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
increment value (value = 2)
store value (value = 2)
load value (value = 2)
decrement value (value = 1)
store value (value = 1)

The final value stored in memory is 1. When the decrement thread is finished before the increment thread starts, the final value is also 1, as seen in the following table:

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
decrement value (value = 0)
store value (value = 0)
load value (value = 0)
increment value (value = 1)
store value (value = 1)

However, when the instructions get interleaved on the CPU, the result is different:

THREAD 1 (INCREMENT) THREAD 2 (DECREMENT)
load value (value = 1)
increment value (value = 2)
load value (value = 1)
decrement value (value = 0)
store value (value = 2)
store value (value = 0)

The final result in this case is 0. In other words, the effect of the increment operation is lost. This is a race condition.

cross.gif

To prevent race conditions try to design your programs so that multiple threads do not need to read or write to shared memory locations. You can also use a synchronization method as described in the Mutual Exclusion section later in this chapter, or if possible, use atomic operations described in the next section.

If you opt to solve the race condition by using a synchronization method such as mutual exclusion, you might run into another common problem with multithreaded programming: deadlocks. Deadlocks are threads blocking indefinitely because they are waiting to acquire access to resources currently locked by other blocked threads.

For example, suppose you have two threads and two resources, A and B. Both threads require a lock on both resources, but they acquire the locks in different order. The following table shows this situation in pseudo code:

THREAD 1 THREAD 2
Lock A Lock B
Lock B Lock A
// ... compute // ... compute
Release B Release A
Release A Release B

Now, imagine that the code in the two threads is executed in the following order:

Thread 1: Lock A

Thread 2: Lock B

Thread 1: Lock B (waits, because lock held by Thread 2)

Thread 2: Lock A (waits, because lock held by Thread 1)

Both threads are now waiting indefinitely, a deadlock situation. Figure 22-2 shows a graphical representation of this deadlock situation. Thread 1 is holding a lock on resource A and is waiting to get a lock on resource B. Thread 2 is holding a lock on resource B and is waiting to get a lock on resource A.

In this graphical representation, you see a cycle that depicts the deadlock situation. Both threads will wait indefinitely, unless you include mechanisms in your program to break these kinds of deadlocks. One possible solution is to try for a certain time to acquire a lock on a resource. If the lock could not be obtained within a certain time interval, the thread stops waiting and possibly releases other locks it is currently holding. The thread might then sleep for a little bit and try again later to acquire all the resources it needs. This method might give other threads the opportunity to acquire necessary locks and continue their execution. Whether this method works or not depends heavily on your specific deadlock case.

Instead of using a workaround as described in the previous paragraph, you should try to avoid any possible deadlock situation altogether. If you need to acquire multiple locks, the recommended way is to use the standard std::lock() or std::try_lock() methods described later in the section on mutual exclusion. These methods will obtain or try to obtain a lock on several resources, doing their best to prevent deadlocks.

ATOMIC OPERATIONS LIBRARY

C++11 introduces atomic types on which atomic operations can be applied. These allow atomic accesses, which means that concurrent reading and writing without additional synchronization is allowed. Reads will never result in undefined behavior.

The increment race condition example given in the previous section can be solved by using an atomic type. For example, the following code is not thread-safe and can show race condition behavior as explained earlier:

int counter = 0;   // Global variable
...
++counter;         // Executed in multiple threads

To make this thread-safe without explicitly using any locks, use an atomic type:

atomic<int> counter(0) ;  // Global variable
...
++counter;                // Executed in multiple threads

You need to include the <atomic> header to use these atomic types. The standard defines the following named integral atomic types:

NAMED ATOMIC TYPE EQUIVALENT ATOMIC TYPE INTEGRAL TYPE
atomic_char atomic<char> char
atomic_schar atomic<signed char> signed char
atomic_uchar atomic<unsigned char> unsigned char
atomic_short atomic<short> short
atomic_ushort atomic<unsigned short> unsigned short
atomic_int atomic<int> int
atomic_uint atomic<unsigned int> unsigned int
atomic_long atomic<long> long
atomic_ulong atomic<unsigned long> unsigned long
atomic_llong atomic<long long> long long
atomic_ullong atomic<unsigned long long> unsigned long long
atomic_char16_t atomic<char16_t> char16_t
atomic_char32_t atomic<char32_t> char32_t
atomic_wchar_t atomic<wchar_t> wchar_t

Atomic Type Example

This section explains why you should use atomic types. Suppose you have a function called func() that increments an integer given as a reference parameter in a loop:

image
void func(int& counter)
{
    for (int i = 0; i < 10000; ++i) {
        ++counter;
    }
}

Code snippet from atomicinc_func_non_atomic.cpp

Now, you would like to run several threads in parallel, all executing this func() function. By implementing this naively without atomic types, you introduce a race condition. The following main() function launches several threads with the std::thread class defined in the <thread> header file. The constructor of the thread class requires a pointer to a function to execute in the new thread, and the arguments for the function. After having launched 10 threads, the main() function waits for all threads to finish, by calling join() on each thread. The details of how to use std::thread and join() are not important at this point and are discussed later in this chapter.

image
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
int main()
{
    int counter = 0;
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread{func, std::ref(counter)});
    }
    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Result = " << counter << std::endl;
    return 0;
}

Code snippet from atomicinc_func_non_atomic.cpp

Since func() increments the integer 10.000 times, and main() launches 10 background threads, each of which executes func(), the expected result is 100.000. If you execute this program several times, you might get the following output:

Result = 86044
Result = 100000
Result = 99726
Result = 90780
Result = 100000
Result = 85054
Result = 84157
Result = 77133
Result = 74325

This code is clearly showing race condition behavior. In this example, the best and recommended solution is to use the new atomic types. The following code highlights the required changes:

image
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
#include <atomic>
void func(std::atomic<int>& counter)
{
    for (int i = 0; i < 10000; ++i) {
        ++counter;
    }
}
int main()
{
    std::atomic<int> counter(0);
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread{func, std::ref(counter)});
    }
    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Result = " << counter << std::endl;
    return 0;
}

Code snippet from atomicinc_func_atomic.cpp

The changes add the <atomic> header file, and change the type of the shared counter to std::atomic<int> instead of int. When you run the new example, you will always get 100.000 as the result:

Result = 100000
Result = 100000
Result = 100000
Result = 100000
Result = 100000

Without explicitly adding any locks to your code, you have made it thread-safe and race condition free because the ++counter operation on an atomic type will load the value, increment the value, and store the value in one atomic transaction, which cannot be interrupted.

pen.gif

If you want to compile multithreaded code with GCC, you have to link with the pthread library. For example:

> gcc -lstdc++ -lpthread inc_func_atomic.cpp

Atomic Operations

The C++11 standard defines a number of atomic operations. This section describes a few of those operations. For a full list, consult the Standard Library Reference resource on the website, www.wrox.com.

An example of an atomic operation is:

bool atomic_compare_exchange_strong(atomic<C>* object, C* expected, C desired);

This operation can also be called as a member of atomic<C>:

bool atomic<C>::compare_exchange_strong(C* expected, C desired);

This operation implements the following logic atomically:

if (memcmp(object, expected, sizeof(*object)) == 0) {
    memcpy(object, &desired, sizeof(*object));
    return true;
} else {
    memcpy(expected, object, sizeof(*object));
    return false;
}

A second example is atomic<T>::fetch_add(), which fetches the current value of an atomic type, adds the given increment to the atomic value, and returns the original non-incremented value. For example:

image
atomic<int> value(10);
cout << "Value = " << value << endl;
int fetched = value.fetch_add(4);
cout << "Fetched = " << fetched << endl;
cout << "Value = " << value << endl;

Code snippet from atomicfetch_add.cpp

If no other threads are touching the contents of the fetched and value variables, the output is as follows:

Value = 10
Fetched = 10
Value = 14

Atomic integral types support the following atomic operations: fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(), ++, --, +=, -=, &=, ^= and |=. Atomic pointer types support fetch_add(), fetch_sub(), ++, --, += and -=.

Most of the atomic operations can accept an extra parameter specifying the memory ordering that you would like. For example:

T atomic<T>::fetch_add(T value, memory_order = memory_order_seq_cst);

You may change the default memory_order. The standard provides: memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel and memory_order_seq_cst, all defined in the std namespace. However, it is rare that you will want to use them instead of the default. While another memory order may perform better than the default, according to some metric, if you use them slightly wrong you will again introduce race conditions or other difficult-to-track threading-related problems. If you do need to know more about memory ordering, consult one of the multithreading references in Appendix B.

THREADS

The C++11 threading library, defined in the <thread> header file, makes it very easy to launch new threads. Specifying what needs to be executed in the new thread can be done in several ways. You can let the new thread execute a global function, the operator() of a function object, a lambda expression, or even a member function of an instance of some class. The following sections give small examples of all these methods.

Thread with Function Pointer

Functions like CreateThread(), _beginthread(), and so on, on Windows, and pthread_create() with the pthreads library, require that the thread function only has one parameter. On the other hand, a function that you want to use with the C++11 std::thread class can have as many parameters as you want.

Suppose you have the following function. The counter() function accepts two integers: the first representing an ID and the second representing the number of iterations that the function should loop. The body of the function is a single loop that loops the given number of iterations. On each iteration, a message is printed to standard output:

image
void counter(int id, int numIterations)
{
    for (int i = 0; i < numIterations; ++i) {
        cout << "Counter " << id << " has value ";
        cout << i << endl;
    }
}

Code snippet from threadThreadFunction.cpp

You can launch multiple threads executing this function using std::thread. You can create a thread t1, executing counter() with arguments 1 and 6 as follows:

thread t1(counter, 1, 6);

The constructor of the thread class is a variadic template, which means that it accepts any number of arguments. Variadic templates are discussed in detail in Chapter 20. The first argument is the name of the function to execute in the new thread. The subsequent variable number of arguments are passed to this function when execution of the thread starts.

If you want to run the counter() function in parallel in several threads, you need to make sure that access to cout is thread-safe. To make this thread-safe, you need to call:

cout.sync_with_stdio(true);

Without calling sync_with_stdio(true), accessing the stream from multiple threads might cause race conditions.

The following code launches two threads executing the counter() function. After launching the threads, main() calls join() on both threads. This is to make sure that the main thread keeps running until both threads are finished. A call to t1.join() blocks until the thread t1 is finished. Without these two join() calls, the main() function would finish immediately after launching the two threads. This will trigger the application to shut down; causing all other threads spawned by the application to be terminated as well, whether these threads are finished or not.

cross.gif

These join() calls are necessary in these small examples. In real-world applications, you should try to avoid the use of join(), because it causes the thread calling join() to block. Often there are better ways. For example, in a GUI application, a thread that finishes can post a message to the UI thread. The UI thread itself has a message loop processing messages like mouse moves, button clicks and so on. This message loop can also receive the messages from the thread, and you can react to them how you want, all without blocking the UI thread with a join() call.

image
#include <iostream>
#include <thread>
using namespace std;
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    thread t1(counter, 1, 6);
    thread t2(counter, 2, 4);
    t1.join();
    t2.join();
    return 0;
}

Code snippet from threadThreadFunction.cpp

A possible output of this example looks as follows:

Counter 2 has value 0
Counter 1 has value 0
Counter 1 has value 1
Counter 1 has value 2
Counter 1 has value 3
Counter 1 has value 4
Counter 1 has value 5
Counter 2 has value 1
Counter 2 has value 2
Counter 2 has value 3

The output on your system will be different and it will most likely be different every time you run it. This is because two threads are executing the counter() function at the same time, so the output depends on the number of processing cores in your system and on the thread scheduling of the operating system.

Since this example is calling cout.sync_with_stdio(true), accessing cout is thread-safe, however, output from different threads can still be interleaved. This means that the output of the previous example can be mixed together as in:

Counter Counter 2 has value 0
1 has value 0
Counter 1 has value 1
Counter 1 has value 2

Instead of:

Counter 1 has value 0
Counter 2 has value 0
Counter 1 has value 1
Counter 1 has value 2

This can be fixed by using synchronization methods, which are discussed later in this chapter.

pen.gif

Thread function arguments are always copied into some internal storage for the thread. Use std::ref() from the <functional> header to pass them by reference.

Thread with Function Object

The previous section demonstrated how to create a thread and tell it to run a specific function in the new thread by passing a pointer to the function to execute. You can also use a function object, as shown in the following example. With the function pointer technique discussed earlier, the only way to pass information to the thread is by passing arguments to the function. With function objects you can add member variables to your function object class, which you can initialize and use however you want. The example first defines a class called Counter. It has two member variables: an ID and the number of iterations for the loop. Both of these member variables are initialized with the class constructor. To make the Counter class a function object, you need to implement operator(), as discussed in Chapter 13. The implementation of operator() is the same as the counter() function in the previous section:

image
class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }
        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
                cout << "Counter " << mId << " has value ";
                cout << i << endl;
            }
        }
    protected:
        int mId;
        int mNumIterations;
};

Code snippet from threadThreadFunctionObject.cpp

Three methods for initializing threads with a function object are demonstrated in the following main(). The first uses the C++11 uniform initialization syntax. You create an instance of the Counter class with its constructor arguments and give it to the thread constructor between curly braces.

The second defines a named instance of the Counter class and gives this named instance to the constructor of the thread class.

The third looks similar to the first; it creates an instance of the Counter class and gives it to the constructor of the thread class, but uses parentheses instead of curly braces. The ramifications of this are discussed after the code.

image
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    // Using C++11 initialization syntax
    thread t1{Counter(1, 20)};
    // Using named variable
    Counter c(2, 12);
    thread t2(c);
    // Using temporary
    thread t3(Counter(3, 10));
    // Wait for threads to finish
    t1.join();
    t2.join();
    t3.join();
    return 0;
}

Code snippet from threadThreadFunctionObject.cpp

If you compare the creation of t1 with the creation of t3, it looks like the only difference seems to be that the first method uses curly braces while the third method uses brackets. However, when your function object constructor doesn’t require any parameters, the third method as written above will not work. For example:

class Counter
{
    public:
        Counter() {}
        void operator()() const { /* Omitted for brevity */ }
};
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    thread t1(Counter());    // Error!
    t1.join();
    return 0;
}

This will result in a compilation error because C++ will interpret the second line in the main() function as a declaration of a function called t1, which returns a thread object and accepts a pointer to a function without parameters returning a Counter object. For this reason, it’s recommended to use the C++11 uniform initialization syntax:

thread t1{Counter()};    // OK

If your compiler does not support this new syntax you have to add an extra set of parentheses to prevent the compiler from interpreting the line as a function declaration:

thread t1((Counter()));  // OK
pen.gif

Function objects are always copied into some internal storage for the thread. If you want to execute operator() on a specific instance of your function object instead of copying it, you should use std::ref() from the <functional> header to pass your instance by reference.

Thread with Lambda

Because the threading library is new in C++11, the C++11 lambda expression feature fits nicely with it, as demonstrated in the following example:

image
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    thread t1([](int id, int numIterations) {
                    for (int i = 0; i < numIterations; ++i) {
                        cout << "Counter " << id << " has value ";
                        cout << i << endl;
                    }
                }, 1, 5);
    t1.join();
    return 0;
}

Code snippet from threadThreadLambda.cpp

Thread with Member Function

The previous examples illustrate different methods of specifying the thread code:

  • A global function
  • A function object using operator()
  • A lambda expression

However, these are not the only techniques available; you can also specify a member function of a class.

The following example defines a basic Request class with a process() method. The main() function creates an instance of the Request class and launches a new thread, which will execute the process() member function of the Request instance, req:

image
class Request
{
    public:
        Request(int id) : mId(id) { }
        void process()
        {
            cout << "Processing request " << mId << endl;
        }
    protected:
        int mId;
};
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    Request req(100);
    thread t{&Request::process, &req};
    t.join();
    return 0;
}

Code snippet from threadThreadMemFunc.cpp

With this technique you are executing a method on a specific object in a separate thread. If other threads are accessing the same object, you need to make sure this happens in a thread-safe way to avoid race conditions. Mutual exclusion, discussed later in this chapter, can be used as synchronization mechanism to make it thread-safe.

Thread Local Storage

C++11 supports the concept of thread local storage. With a new keyword called thread_local, you can mark any variable as thread local, which means that each thread will have its own unique copy of the variable and it will last for the entire duration of the thread. For each thread, the variable is initialized exactly once. This is very similar to a static variable, except that each thread will have its own unique instance of the thread local variable. For example, in the following code, every thread shares one-and-only-one copy of k, while each thread has its own unique copy of n:

thread_local int n;
int k;
void doWork()
{
    // perform some computation 
}

Note that if the thread_local variable is declared in the scope of a function, its behavior is as if it were declared static, except that every thread has its own unique copy and it is initialized exactly once per thread, no matter how many times that function is called in that thread.

Cancelling Threads

The standard does not include any mechanism for cancelling a running thread from inside another thread. The best way to achieve this is to provide some communication mechanism that the two threads agree on. The simplest mechanism is to have a shared variable which the target thread checks periodically to determine if it should terminate. Other threads can set this shared variable to indirectly instruct the thread to shut down.

A second method is to use condition variables, discussed later in this chapter.

Retrieving Results from Threads

As you saw in the previous examples, launching a new thread is pretty easy. However, in most cases you are probably interested in results produced by the thread. For example, if your thread performs some mathematical calculations, you really would like to get the results out of the thread once the thread is finished. One way is to pass a pointer or reference to a result variable to the thread in which the thread will store the results. Another method is to store the results inside a class member variable, which you can retrieve later once the thread has finished executing.

However, there is another and easier method to obtain a result from threads: futures. They also make it easier to handle errors that occur inside your threads. Futures are discussed later in this chapter.

Copying and Rethrowing Exceptions

The whole exception mechanism in C++ works perfectly, as long as it stays within one single thread. Every thread can throw its own exceptions, but they need to be caught within their own thread. Exceptions thrown in one thread cannot be caught in another thread. This introduces quite a few problems when you would like to use exception handling in combination with multithreaded programming.

Without C++11 it’s very difficult if not impossible to gracefully handle exceptions across threads. C++11 solves this issue with the following new exception-related functions:

exception_ptr current_exception() noexcept;

This function returns an exception_ptr object that refers to the exception currently being handled, or a copy of the currently handled exception, or a null exception_ptr object if no exception is being handled. This referenced exception object remains valid for as long as there is an object of type exception_ptr that is referencing it. exception_ptr is of type NullablePointer, which means it can easily be tested with a simple if statement as the example later in this section demonstrates.

[[noreturn]] void rethrow_exception(exception_ptr p);

This function rethrows the exception referenced by the exception_ptr parameter. Rethrowing the referenced exception does not have to be done in the same thread that generated the referenced exception in the first place, which makes this feature perfectly suited for handling exceptions across different threads. The [[noreturn]] attribute makes it clear that this function will never return normally and is introduced in Chapter 9.

template<class E> exception_ptr make_exception_ptr(E e) noexcept; 

This function creates an exception_ptr object that refers to a copy of the given exception object. This is basically a shorthand notation for the following code:

try {
    throw e;
} catch(...) {
    return current_exception();
}

Let’s see how handling exceptions across different threads can be implemented by using these new features.

The following code defines a function that does some work and throws an exception. This function will ultimately be running in a separate background thread:

image
void doSomeWork() throw(runtime_error)
{
    for (int i = 0; i < 5; ++i)
        cout << i << endl;
    cout << "Thread throwing a runtime_error exception..." << endl;
    throw runtime_error("Exception from thread");
}

Code snippet from ExceptionsWithThreadsExceptionsWithThreads.cpp

The following threadFunc() function wraps the call to the preceding function in a try/catch block, which will catch all exceptions that doSomeWork() might throw. A single argument is supplied to threadFunc(), which is of type exception_ptr&. Once an exception is caught, the function current_exception() is used to get a reference to the exception being handled, which is then assigned to the exception_ptr parameter. After that, the thread exits normally:

image
void threadFunc(exception_ptr& err)
{
    try {
        doSomeWork();
    } catch (...) {
        cout << "Thread caught exception, returning exception..." << endl;
        err = current_exception();
    }
}

Code snippet from ExceptionsWithThreadsExceptionsWithThreads.cpp

The following doWorkInThread() function is called from within the main thread. Its responsibility is to create a new thread and start executing the preceding threadFunc() function in it. A reference to an object of type exception_ptr is given as argument to threadFunc(). Once the thread is created, the doWorkInThread() function waits for the thread to finish by using the join() method, after which the error object is examined. Since exception_ptr is of type NullablePointer, you can easily check it by using if (error). If it’s a non-null value, the exception is rethrown in the current thread, which is the main thread in this example. By rethrowing the exception in the main thread, the exception has been transferred from one thread to another thread.

image
void doWorkInThread() throw(runtime_error)
{
    exception_ptr error;
    // Launch background thread
    thread t{threadFunc, ref(error)};
    // Wait for thread to finish
    t.join();
    // See if thread has thrown any exception
    if (error)
    {
        cout << "Main thread received exception, rethrowing it..." << endl;
        rethrow_exception(error);
    }
    else
        cout << "Main thread did not receive any exception." << endl;
}

Code snippet from ExceptionsWithThreadsExceptionsWithThreads.cpp

The main() function is pretty straightforward. It calls doWorkInThread() and wraps the call in a try/catch block to catch exceptions thrown by any thread spawned by doWorkInThread():

image
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    try {
        doWorkInThread();
    } catch (const exception& e) {
        cout << "Main function caught: '" << e.what() << "'" << endl;
    }
    return 0;
}

Code snippet from ExceptionsWithThreadsExceptionsWithThreads.cpp

The output is as follows:

0
1
2
3
4
Thread throwing a runtime_error exception...
Thread caught exception, returning exception...
Main thread received exception, rethrowing it...
Main function caught: 'Exception from thread'

To keep this example compact and easier to understand, the doWorkInThread() function is using join() to block and wait until the thread is finished. Of course, in real-world applications you will not want to block your main thread. For example, in a GUI application, you might let threadFunc() send a message to the UI thread with as argument a copy of the result of current_exception().

MUTUAL EXCLUSION

If you are writing multithreaded applications, you have to be sensitive to sequencing of operations. If your threads read and write shared data, this can be a problem. There are many ways to avoid this problem, such as never actually sharing data between threads. However, if you can’t avoid sharing data, you must provide for synchronization so that only one thread at a time can change the data.

The real problem arises when there are two or more units of data (hardware-level units) which have to be changed as a single atomic action. A simple assignment to a simple scalar variable typically requires no synchronization. For example, on the x86, a 32-bit aligned value is handled atomically at the hardware level and a single transaction requires no explicit synchronization. For example, setting a Boolean variable to false to stop a thread requires no synchronization, but an operation like ++, --, or op= for any given op requires synchronization. Scalars can often be synchronized properly by using the fetch_xyz() operations described earlier in this chapter, but when your data is more complex, and you need to use that data from multiple threads, you must provide synchronization.

C++11 has support for mutual exclusion in the form of mutex and lock classes. These can be used to implement synchronization between threads and are discussed in the next sections.

Mutex Classes

The mutual exclusion classes are all defined in the <mutex> header file and are in the std namespace. The basic mechanism of using a mutex is as follows:

  • A thread that wants to read/write to memory shared with other threads tries to lock a mutex object. If another thread is currently holding this lock, the new thread that wants to gain access will block until the lock is released, or until a timeout interval expires.
  • Once the thread has obtained the lock, it is free to read and write memory which is shared with other threads because at this point it is working under the assumption that no other threads are attempting to read or write the same data.
  • After the thread is finished with reading/writing to the shared memory it releases the lock to give some other thread an opportunity to obtain the lock to the shared memory. If two or more threads are waiting on the lock, there are no guarantees as to which thread will be granted the lock and thus allowed to proceed.

The standard provides non-timed mutex classes and timed mutex classes discussed in the following sections.

Non-Timed Mutex Classes

The library has two non-timed mutex classes: std::mutex and std::recursive_mutex. Each supports the following methods:

  • lock(): The calling thread will try to obtain the lock and will block until the lock has been acquired. It will block indefinitely. If there is a desire to limit the amount of time the thread blocks, you should use a time mutex, discussed in the next section.
  • try_lock(): The calling thread will try to obtain the lock. If the lock is currently held by another thread, the call will return immediately. If the lock has been obtained, try_lock() returns true, otherwise it returns false.
  • unlock(): Releases the lock held by the calling thread, making it available for another thread.

std::mutex is a standard mutual exclusion class with exclusive ownership semantics. There can be only one thread owning the mutex. If another thread wants to obtain ownership of this mutex, it will either block when using lock(), or fail when using try_lock(). A thread already having ownership of a mutex is not allowed to call lock() or try_lock() again on that mutex. This might lead to a deadlock!

std::recursive_mutex behaves almost identically to std::mutex, except that a thread already having ownership of a recursive mutex is allowed to call lock() or try_lock() again on the same mutex. The calling thread should call the unlock() method as many times as it obtained a lock on the recursive mutex.

Timed Mutex Classes

The library provides a std::timed_mutex and a std::recursive_timed_mutex; both are timed mutex classes supporting the normal lock(), try_lock() and unlock() methods. Additionally they support:

  • try_lock_for(rel_time): The calling thread will try to obtain the lock for a certain relative time. If the lock could not be obtained after the given timeout, the call fails and returns false. If the lock could be obtained within the timeout, the call succeeds and returns true.
  • try_lock_until(abs_time): The calling thread will try to obtain the lock until the system time equals or exceeds the specified absolute time. If the lock could be obtained before this time, the call returns true. If the system time passes the given absolute time, the function stops trying to obtain the lock and returns false.

A thread already having ownership of a timed_mutex is not allowed to call one of the previous lock calls again on that mutex. This might lead to a deadlock!

recursive_timed_mutex behaves almost identically to timed_mutex, except that a thread already having ownership of a recursive mutex is allowed to call one of the previous lock calls again on the same mutex. The calling thread should call the unlock() method as many times as it obtained a lock on the recursive mutex.

Locks

A lock class is a wrapper class that makes it easier to obtain and release a lock on a mutex; the destructor of the lock class will automatically release the associated mutex. The standard defines two types of locks: a simple lock, std::lock_guard, whose constructor always acquires the mutex and will block until the lock is acquired; and a more sophisticated std::unique_lock, which allows you to defer lock acquisition until later in the computation, long after the declaration. unique_lock has several constructors:

  • A constructor accepting a reference to a mutex. This one tries to obtain a lock on the mutex and blocks until the lock is obtained. The keyword explicit for constructors is discussed in Chapter 7.
explicit unique_lock(mutex_type& m);
  • A constructor accepting a reference to a mutex and an instance of the std::defer_lock_t struct. The unique_lock stores the reference to the mutex, but does not immediately try to obtain a lock. A lock can be obtained later.
unique_lock(mutex_type& m, defer_lock_t) noexcept;
  • A constructor accepting a reference to a mutex and an instance of the std::try_to_lock_t struct. The lock tries to obtain a lock to the referenced mutex, but if it fails it does not block.
unique_lock(mutex_type& m, try_to_lock_t);
  • A constructor accepting a reference to a mutex and an instance of the std::adopt_lock_t struct. The lock assumes that the calling thread already has obtained a lock on the referenced mutex and will manage this lock.
unique_lock(mutex_type& m, adopt_lock_t);
  • A constructor accepting a reference to a mutex and an absolute time. The constructor tries to obtain a lock until the system time passes the given absolute time. The Chrono library is discussed in Chapter 16.
template <class Clock, class Duration>
unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);
  • A constructor accepting a reference to a mutex and a relative time. The constructor tries to get a lock on the mutex with the given relative timeout.
template <class Rep, class Period>
unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);

The unique_lock class also has the following methods: lock(), try_lock(), try_lock_for() and try_lock_until(), which behave as explained in the section on timed mutex classes earlier in this chapter. You can use the owns_lock() method to see if the lock has been acquired.

C++11 also has support for obtaining locks on multiple mutex objects at once. For this, you can use the generic lock() variadic template function. Variadic template functions are discussed in Chapter 20.

template <class L1, class L2, class... L3> void lock(L1&, L2&, L3&...);

This generic function locks all the mutex objects in order. If one of the mutex lock calls throws an exception, unlock() is called on all locks that have already been obtained.

There is also a generic try_lock() function:

template <class L1, class L2, class... L3> int try_lock(L1&, L2&, L3&...);

try_lock() tries to obtain a lock on all the given mutex objects by calling try_lock() on each of them in sequence. It returns -1 if all calls to try_lock() succeed. If any try_lock() fails, unlock() is called on all locks that have already been obtained, and the return value is the zero-based index of the parameter position of the mutex on which try_lock() failed.

cross.gif

When your threads need to acquire multiple locks at once, it is recommended to use the generic try_lock() or lock(), always using the same order for the mutex arguments. If you don’t use the same order for the mutex arguments, deadlocks might occur.

The following example demonstrates how to use the generic lock() function. The process() function first creates two locks, one for each mutex, and gives an instance of std::defer_lock_t as second argument to tell unique_lock not to acquire the lock during construction. The call to lock() will then acquire both locks:

image
mutex mut1;
mutex mut2;
void process()
{
    unique_lock<mutex> lock1(mut1, defer_lock_t());
    unique_lock<mutex> lock2(mut2, defer_lock_t());
    lock(lock1, lock2);
    // Locks acquired
}
int main()
{
    process();
    return 0;
}

Code snippet from mutexmultiple_locks.cpp

By always calling lock() in all threads with the lock1 and lock2 arguments in the same order, lock() will not cause any deadlocks. However, if you do call lock() from another thread, for example as follows, deadlocks might occur:

lock(lock2, lock1);

std::call_once

You can use std::call_once() in combination with std::once_flag to make sure a certain function or method is called exactly one time no matter how many threads try to call call_once().

Only one call_once() invocation will actually call the given function or method; this invocation is called the effective call_once() invocation. This effective invocation on a specific once_flag instance finishes before all subsequent call_once() invocations on the same once_flag instance. Other threads calling call_once() on the same once_flag instance will block until the effective call is finished. Figure 22-3 illustrates this with three threads. Thread 1 performs the effective call_once() invocation, Thread 2 blocks until the effective invocation is finished, and Thread 3 doesn’t block because the effective invocation from Thread 1 has already finished.

The following example demonstrates the use of call_once(). The example defines a class called Data, which has a dynamically allocated block of memory called mMemory. The mutable keyword is discussed in Chapter 7. Multiple threads can execute operator(); however, for this example, mMemory should be initialized only once. To accomplish this, the class includes a member mOnceFlag of type once_flag. The operator() uses call_once() together with this once_flag to call the init() method. The result is that only one thread will execute this init() method exactly one time. While this call_once() call is in progress, other threads will block until the init() method returns:

image
class Data
{
    public:
        void operator()()
        {
            call_once(mOnceFlag, &Data::init, this);
            // Do work
            cout << "Work" << endl;
        }
    protected:
        void init()
        {
            cout << "init()" << endl;
            mMemory = new char[24];
        }
        mutable once_flag mOnceFlag;
        mutable char* mMemory;
};
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    Data d;
    thread t1{ref(d)};
    thread t2{ref(d)};
    thread t3{ref(d)};
    t1.join();
    t2.join();
    t3.join();
    return 0;
}

Code snippet from mutexcall_once.cpp

The output of this code is as follows:

init()
Work
Work
Work

The example omits the code for deallocation of the memory. Of course, in this example, you could also allocate the memory for mMemory in the constructor of the Data class. It is done using the init() method to demonstrate the use of call_once().

Mutex Usage Examples

Thread-Safe Writing to Streams

Earlier in this chapter, in the section about threads, you saw an example with a class called Counter. That example mentioned that C++ streams are thread-safe when you call sync_with_stdio(true), but the output from different threads can still be interleaved. To solve this issue, you can use a mutual exclusion object to make sure that only one thread at a time is reading/writing to the stream object.

The following example synchronizes all accesses to cout in the Counter class. For this, a static mutex object is added to the class. It should be static, because all instances of the class should use the same mutex instance. Before writing to cout, the updated example uses a lock_guard to obtain a lock on the mutex. Changes compared to the earlier version are highlighted:

image
class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }
        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
               lock_guard<mutex> lock(mMutex);
                cout << "Counter " << mId << " has value ";
                cout << i << endl;
            }
        }
    protected:
        int mId;
        int mNumIterations;
        static mutex mMutex;
};
mutex Counter::mMutex;
int main()
{
    // Omitted for brevity.
}

Code snippet from mutexThreadFunctionObjectWithMutex.cpp

Using Timed Locks

The following example demonstrates how to use a timed mutex. It is the same Counter class as before, but this time it uses a timed_mutex in combination with a unique_lock. A relative time of 200 milliseconds is given to the unique_lock constructor, causing it to try to obtain a lock for 200 milliseconds. If the lock could not be obtained within this timeout interval, the constructor returns. Afterwards you can check whether or not the lock has been acquired, which can be done with an if statement on the lock variable because the unique_lock class defines a bool() conversion operator. The timeout is specified by using the C++11 Chrono library, which is discussed in Chapter 16.

image
class Counter
{
    public:
        Counter(int id, int numIterations)
            : mId(id), mNumIterations(numIterations)
        {
        }
        void operator()() const
        {
            for (int i = 0; i < mNumIterations; ++i) {
                unique_lock<timed_mutex> lock(mTimedMutex,
                    chrono::milliseconds(200));
                if (lock) {
                    cout << "Counter " << mId << " has value ";
                    cout << i << endl;
                } else {
                    // Lock not acquired in 200 ms
                }
            }
        }
    protected:
        int mId;
        int mNumIterations;
        static timed_mutex mTimedMutex;
};
timed_mutex Counter::mTimedMutex;
int main()
{
    // Omitted for brevity.
}

Code snippet from mutexThreadFunctionObjectWithTimedMutex.cpp

Double-Checked Locking

You can use locks to implement the double-checked locking algorithm. This can, for example, be used to make sure that a variable is initialized exactly once as an alternative to using call_once(). The following example shows how you can implement this. It is called the double-checked locking algorithm because it is checking the value of the initialized variable twice, once before acquiring the lock and once right after acquiring the lock. The first initialized check is to prevent obtaining a lock when it is not needed and will increase performance. The second check is required to make sure that no other thread performed the initialization between the first initialized check and acquiring the lock:

image
class MyClass
{
    public:
        void init() {cout << "Init" << endl;}
};
MyClass var;
bool initialized = false;
mutex mut;
void func()
{
    if (!initialized) {
        unique_lock<mutex> lock(mut);
        if (!initialized) {
            var.init();
            initialized = true;
        }
    }
    cout << "OK" << endl;
}
int main()
{
    cout.sync_with_stdio(true); // Make sure cout is thread-safe
    vector<thread> threads;
    for (int i = 0; i < 5; ++i)
        threads.push_back(thread{func});
    for (auto& t : threads)
        t.join();
    return 0;
}

Code snippet from mutexdouble_checked_locking.cpp

The output of this program is as follows:

Init
OK
OK
OK
OK
OK

This output clearly shows that only one thread has initialized the MyClass instance.

CONDITION VARIABLES

Condition variables allow a thread to block until a certain condition is set by another thread or until the system time reaches a specified time. They allow for explicit inter-thread communication. If you are familiar with multithreaded programming using the Win32 API, you can compare condition variables with event objects in Windows.

You need to include the <condition_variable> header file to use condition variables. There are two kinds of condition variables available in the standard:

  • std::condition_variable: A condition variable that can only wait on a unique_lock<mutex>, which, according to the standard, allows for maximum efficiency on certain platforms.
  • std::condition_variable_any: A condition variable that can wait on any kind of object, including custom lock types.

The condition_variable class supports the following methods:

notify_one();

Wake up one of the threads waiting on this condition variable. This is similar to an auto-reset event in Windows.

notify_all();

Wake up all threads waiting on this condition variable.

wait(unique_lock<mutex>& lk);

The thread calling wait() should already have acquired a lock on lk. The effect of calling wait() is that it will atomically call lk.unlock() and then block the thread, waiting for a notification. When the thread is unblocked by a notify_one() or notify_all() call in another thread, the function will call lk.lock() again, possibly blocking on the lock and then returns.

wait_for(unique_lock<mutex>& lk,
    const chrono::duration<Rep, Period>& rel_time);

Similar to the previous wait() method, except that the thread will be unblocked by a notify_one() call, a notify_all() call, or when the given timeout has expired.

wait_until(unique_lock<mutex>& lk,
    const chrono::time_point<Clock, Duration>& abs_time);

Similar to the previous wait() method, except that the thread will be unblocked by a notify_one() call, a notify_all() call, or when the system time passes the given absolute time.

There are also versions of wait(), wait_for(), and wait_until() that accept an extra predicate parameter. For example, wait() accepting an extra predicate is equivalent to:

while (!predicate())
    wait(lk);

The condition_variable_any class supports the same methods as the condition_variable class except that it accepts any kind of Lock class instead of only a unique_lock<mutex>.

Condition variables can, for example, be used for background threads processing items from a queue. You can define a queue in which you insert items to be processed. A background thread waits until there are items in the queue. When an item is inserted into the queue, the thread wakes up, processes the item, and goes back to sleep, waiting for the next item. Suppose you have the following queue:

std::queue<std::string> mQueue;

You need to make sure only one thread is modifying this queue at any given time. You can do this with a mutex:

std::mutex mMutex;

To be able to notify a background thread when an item is added, you need a condition variable:

std::condition_variable mCondVar;

A thread that wants to add an item to the queue first acquires a lock on the mutex, adds the item to the queue, and notifies the background thread:

// Lock mutex and add entry to the queue.
unique_lock<mutex> lock(mMutex);
mQueue.push(entry);
// Notify condition variable to wake up thread.
mCondVar.notify_all();

The background thread waits for notifications in an infinite loop as follows:

unique_lock<mutex> lock(mMutex);
while (true) {
    // Wait for a notification.
    mCondVar.wait(lock);
    // Condition variable is notified, so something is in the queue.
    // Process queue item...
} 

The section “Example: Multithreaded Logger Class” toward the end of this chapter provides a complete example of how to use condition variables to send notifications to other threads.

The standard also defines a helper function called std::notify_all_at_thread_exit(cond, lk) where cond is a condition variable and lk is a unique_lock<mutex> instance. A thread calling this function should already have acquired the lock lk. When the thread exits, it will automatically execute the following:

lk.unlock();
cond.notify_all();
pen.gif

The lock lk stays locked until the thread exits. So, you need to make sure that this does not cause any deadlocks in your code, for example due to wrong lock ordering. Lock ordering is discussed earlier in this chapter.

FUTURES

As discussed earlier in this chapter, using std::thread to launch a thread that calculates a single result does not make it easy to get the computed result back once the thread has finished executing. Another problem with std::thread is handling errors like exceptions. If a thread throws an exception and this exception is not handled by the thread itself, the C++ runtime will call std::terminate, which usually will terminate the whole application. You can avoid this by using std::future, which is able to transport an uncaught exception to another thread, which can then handle the exception however it wants. Of course, you should always try to handle exceptions in the threads themselves as much as possible, preventing them from leaving the thread.

std::future and std::promise work together to make it easier to retrieve a result from a function that ran in the same thread or in another thread. Once a function, running in the same thread or in another thread, has calculated the value that it wants to return, it puts this value in a promise. This value can then be retrieved through a future. You can think of a future/promise pair as an inter-thread communication channel for a result.

A thread that launched another thread to calculate a value can get this value as follows. T is the type of the calculated result:

future<T> fut = ...;   // Is discussed later
T res = fut.get();

The call to get() will retrieve the result and store it in the variable res. If the other thread has not yet finished calculating the result, the call to get() will block until the value becomes available. You can avoid blocking by first asking the future if there is a result available:

if (fut.wait_for(0)) {  // Value is available
    T res = fut.get();
} else {                // Value is not yet available
    ...
}

A promise is the input side for the result; future is the output side. A promise is something where a thread will store its calculated result. The following code demonstrates how a thread might do this:

promise prom = ...; // Is discussed later
T val = ...;        // Calculate result value
prom.set_value(val);

If a thread encounters some kind of error during its calculation, it could store an exception in the promise instead of the value:

prom.set_exception(runtime_error("message"));

A thread that launches another thread to calculate something should give the promise to the newly launched thread, so that it can store the result in it. This is made easy by using std::packaged_task, which will automatically link a future and a promise. The following code demonstrates this. It creates a packaged_task, which will execute the given lambda expression in a separate thread. The lambda expression accepts two arguments and returns the sum of them as the result. The future is retrieved from the packaged_task by calling get_future(). The thread is started by the third line, and the last line uses the get() function to wait and retrieve the result from the launched thread:

image
packaged_task<int(int, int)> task([](int i1, int i2) {return i1+i2;});
auto fut = task.get_future();  // Get the future
task(2, 3);                    // Launch the task
int res = fut.get();           // Retrieve the result

Code snippet from futurepackaged_task.cpp

cross.gif

This code is just for demonstration purposes. Currently, it launches a separate thread and then calls get() which blocks until the result is calculated. This sounds like a very expensive function call. In real-world applications you can use the promise/future model by periodically checking if there is a result available in the future (using wait_for() as discussed earlier). When the result is not yet available, you can do something else in the mean time, instead of blocking.

If you want to give the C++ runtime more control over whether or not a thread is created to calculate something, you can use std::async(). It accepts a function to be executed and returns a future that you can use to retrieve the result. There are two ways in which async() can call your function:

  • Creating a new thread to run your function asynchronously
  • Running your function at the time you call get() on the returned future

If you call async() without additional arguments, the run-time library will automatically choose one of the two methods depending on factors like the number of processors in your system. You can force the runtime to use one or the other method by specifying a launch::async or launch::deferred policy argument, respectively. The following example demonstrates the use of async():

image
int calculate()
{
    return 123;
}
int main()
{
    auto fut = async(calculate);
    //auto fut = async(launch::async, calculate);
    //auto fut = async(launch::deferred, calculate);
 
    // Do some more work...
 
    // Get result
    int res = fut.get();
    cout << res << endl;
    return 0;
}

Code snippet from futureasync.cpp

As you can see in this example, std::async() is one of the easiest methods to perform some calculations in another thread or the same thread, and retrieve the result afterwards.

EXAMPLE: MULTITHREADED LOGGER CLASS

This section demonstrates how to use threads, the mutex and lock classes, and condition variables to write a multithreaded Logger class. The class allows log messages to be added to a queue from different threads. The Logger class itself will process this queue in another background thread that serially writes the log messages to a file. The class will be designed in three iterations.

It is obvious that you will have to protect access to the queue with a mutex to prevent multiple threads from reading/writing to the queue at the same time. Based on that, you might define the Logger class as follows:

image
class Logger
{
    public:
        // Starts a background thread writing log entries to a file.
        Logger();
        // Add log entry to the queue.
        void log(const std::string& entry);
    protected:
        void processEntries();
        // Mutex and condition variable to protect access to the queue.
        std::mutex mMutex;
        std::condition_variable mCondVar;
        std::queue<std::string> mQueue;
        // The background thread.
        std::thread mThread;
    private:
        // Prevent copy construction and assignment.
        Logger(const Logger& src);
        Logger& operator=(const Logger& rhs);
};

Code snippet from loggerVersion1Logger.h

The implementation is as follows. Note that this initial design has a couple of problems and when you try to run it, it might behave strangely or even crash, as discussed and solved after the example. The inner while loop in the processEntries() method is also worth looking at. It processes all messages in the queue one at a time, and acquires and releases the lock on each iteration. This is done to make sure the loop doesn’t keep the lock for too long, blocking other threads.

image
Logger::Logger()
{
    // Start background thread.
    mThread = thread{&Logger::processEntries, this};
}
void Logger::log(const std::string& entry)
{
    // Lock mutex and add entry to the queue.
    unique_lock<mutex> lock(mMutex);
    mQueue.push(entry);
    // Notify condition variable to wake up thread.
    mCondVar.notify_all();
}
void Logger::processEntries()
{
    // Open log file.
    ofstream ofs("log.txt");
    if (ofs.fail()) {
        cerr << "Failed to open logfile." << endl;
        return;
    }
    // Start processing loop.
    unique_lock<mutex> lock(mMutex);
    while (true) {
        // Wait for a notification.
        mCondVar.wait(lock);
        // Condition variable is notified, so something is in the queue.
        lock.unlock();
        while (true) {
            lock.lock();
            if (mQueue.empty()) {
                break;
            } else {
                ofs << mQueue.front() << endl;
                mQueue.pop();
            }
            lock.unlock();
        }
    }
}

Code snippet from loggerVersion1Logger.cpp

This Logger class can be tested by using the following test code, which launches a number of background threads, all logging a few messages to the same Logger instance:

image
void logSomeMessages(int id, Logger& logger)
{
    for (int i = 0; i < 10; ++i) {
        stringstream ss;
        ss << "Log entry " << i << " from thread " << id;
        logger.log(ss.str());
    }
}
int main()
{
    Logger logger;
    vector<thread> threads;
    // Create a few threads all working with the same Logger instance.
    for (int i = 0; i < 10; ++i) {
        threads.push_back(thread{logSomeMessages, i, ref(logger)});
    }
    // Wait for all threads to finish.
    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

Code snippet from loggerVersion1main.cpp

If you build and run this naïve initial version, you will notice a couple of problems with it, especially when you run it on a multicore machine.

The first problem is that the background Logger thread will be terminated abruptly when the main() function finishes. This means that messages still in the queue will not be written to the file on disk. Some run-time libraries will even issue an error or generate a crash dump when the background Logger thread is abruptly terminated. You need to add a mechanism to gracefully shut down the background thread and wait until the background thread is completely shut down before terminating the application itself. This can be done by adding a Boolean member variable to the class. The destructor of the Logger class will set this Boolean to true, notify the background thread, and wait until the thread is shut down. The notification will trigger the background thread to wake up, check this Boolean value and, if it’s true, write all messages in the queue to the file and terminate the processing loop. The new definition of the class is as follows:

image
class Logger
{
    public:
        // Starts a background thread writing log entries to a file.
        Logger();
        // Gracefully shut down background thread.
        virtual ~Logger();
        // Add log entry to the queue.
        void log(const std::string& entry);
    protected:
        void processEntries();
        bool mExit;
        // Remaining of the class omitted for brevity
};

Code snippet from loggerVersion2Logger.h

The Logger constructor needs to initialize the mExit variable:

image
Logger::Logger() : mExit(false)
{
    // Start background thread.
    mThread = thread{&Logger::processEntries, this};
}

Code snippet from loggerVersion2Logger.cpp

The destructor sets the Boolean variable, wakes up the thread, and then waits until the thread is shut down:

image
Logger::~Logger()
{
    // Gracefully shut down the thread by setting mExit
    // to true and notifying the thread.
    mExit = true;
    // Notify condition variable to wake up thread.
    mCondVar.notify_all();
    // Wait until thread is shut down.
    mThread.join();
}

Code snippet from loggerVersion2Logger.cpp

The processEntries() method needs to check this Boolean variable and terminate the processing loop when it’s true:

image
void Logger::processEntries()
{
    // Open log file.
    ofstream ofs("log.txt");
    if (ofs.fail()) {
        cerr << "Failed to open logfile." << endl;
        return;
    }
    // Start processing loop.
    unique_lock<mutex> lock(mMutex);
    while (true) {
        // Wait for a notification.
        mCondVar.wait(lock);
        // Condition variable is notified, so something is in the queue
        // and/or we need to shut down this thread.
        lock.unlock();
        while (true) {
            lock.lock();
            if (mQueue.empty()) {
                break;
            } else {
                ofs << mQueue.front() << endl;
                mQueue.pop();
            }
            lock.unlock();
        }
        if (mExit)
           break;
    }
}

Code snippet from loggerVersion2Logger.cpp

A second problem with the first naïve implementation is that sometimes the program might just block indefinitely in a deadlock situation where the main thread is blocking on the following line in the Logger destructor:

mThread.join();

And the Logger background thread is blocking on the following line in the processEntries() method:

mCondVar.wait(lock);

That is because the code contains a race condition. The main thread launches the processEntries() background thread in the Logger constructor, but immediately continues executing the remaining code in the main() function. This remaining code is rather small in this example. It can happen that this remaining code from the main() function, including the Logger destructor, is executed before the Logger background thread has started its processing loop. When that happens, the Logger destructor will already have called notify_all() before the background thread is waiting for the notification, and thus the background thread will miss this notification from the destructor.

To solve this race condition, the Logger constructor should wait after launching the background thread until the background thread is ready to process entries. You can implement this by adding a Boolean variable (mThreadStarted), a mutex (mMutexStarted), and a condition variable (mCondVarStarted) to the Logger class. Here is the final definition:

image
class Logger
{
    public:
        // Starts a background thread writing log entries
        // to a file.
        Logger();
        // Gracefully shut down background thread.
        virtual ~Logger();
        // Add log entry to the queue.
        void log(const std::string& entry);
    protected:
        void processEntries();
        bool mThreadStarted;
        bool mExit;
        // Mutex and condition variable to protect access to the queue.
        std::mutex mMutex;
        std::condition_variable mCondVar;
        std::queue<std::string> mQueue;
        // The background thread.
        std::thread mThread;
        // Mutex and condition variable to detect when the thread
        // starts executing its loop.
        std::mutex mMutexStarted;
        std::condition_variable mCondVarStarted;
    private:
        // Prevent copy construction and assignment.
        Logger(const Logger& src);
        Logger& operator=(const Logger& rhs);
};

Code snippet from loggerFinalVersionLogger.h

The final implementation of the Logger class is as follows. Note that the constructor is using the wait() method accepting a predicate.

image
Logger::Logger() : mThreadStarted(false), mExit(false)
{
    // Start background thread.
    mThread = thread{&Logger::processEntries, this};
    // Wait until background thread starts its processing loop.
    unique_lock<mutex> lock(mMutexStarted);
    mCondVarStarted.wait(lock, [&](){return mThreadStarted == true;});
}
Logger::~Logger()
{
    // Gracefully shut down the thread by setting mExit
    // to true and notifying the thread.
    mExit = true;
    // Notify condition variable to wake up thread.
    mCondVar.notify_all();
    // Wait until thread is shut down.
    mThread.join();
}
void Logger::log(const std::string& entry)
{
    // Lock mutex and add entry to the queue.
    unique_lock<mutex> lock(mMutex);
    mQueue.push(entry);
    // Notify condition variable to wake up thread.
    mCondVar.notify_all();
}
void Logger::processEntries()
{
    // Open log file.
    ofstream ofs("log.txt");
    if (ofs.fail()) {
        cerr << "Failed to open logfile." << endl;
        return;
    }
    // Start processing loop.
    unique_lock<mutex> lock(mMutex);
    // Notify listeners that thread is starting processing loop.
    mThreadStarted = true;
    mCondVarStarted.notify_all();
    while (true) {
        // Wait for a notification.
        mCondVar.wait(lock);
        // Condition variable is notified, so something is in the queue
        // and/or we need to shut down this thread.
        lock.unlock();
        while (true) {
            lock.lock();
            if (mQueue.empty()) {
                break;
            } else {
                ofs << mQueue.front() << endl;
                mQueue.pop();
            }
            lock.unlock();
        }
        if (mExit)
            break;
    }
}

Code snippet from loggerFinalVersionLogger.cpp

THREAD POOLS

Instead of creating and deleting threads dynamically throughout your program lifetime, you can create a pool of threads that can be used as needed. This technique is often used in programs that want to handle some kind of event in a thread. In most environments, the ideal number of threads to have is equal to the number of processing cores. If there are more threads than cores, threads will have to be suspended to allow other threads to run, and this will ultimately add overhead. However, the rate of arrival of events may mean that, at some times there are more events per unit time than can be processed. The solution in this case is to use the producer/consumer model, where a number of pre-created threads are waiting for something to do.

Since not all processing is identical, it is not uncommon to have a thread that receives, as part of its input, a function object that represents the computation to be done.

Because all the threads are pre-existing, it is vastly more efficient for the operating system to schedule one to run than it is for the operating system to create one in response to an input. Furthermore, the use of a thread pool allows you to manage the number of threads that are created, so depending on the platform, you may have as few as one thread or as many as 64.

Note that while the ideal number of threads is equal to the number of cores, this applies only in the case where the thread is compute bound and cannot block for any other reason, including I/O. When a thread can block, it is often appropriate to run more threads than there are cores. Determining the optimal number of threads in such cases may involve doing throughput measurements with the system under normal load conditions.

You can implement a thread pool in a similar way as an object pool. Chapter 24 gives an example implementation of an object pool. The implementation of a thread pool is left as an exercise for the reader.

THREADING DESIGN AND BEST PRACTICES

This section briefly mentions a couple of best practices related to multithreaded programming.

  • Before terminating the application, always use join() to wait for background threads to finish: Make sure you use join() on all background threads before terminating your application. This will make sure all those background threads have the time to do proper cleanup. Background threads for which there is no join() will terminate abruptly when the main thread is terminated.
  • The best synchronization is no synchronization: Multithreaded programming becomes much easier if you manage to design your different threads in such a way that all threads working on shared data read only from that shared data and never write to it, or only write to parts never read by other threads. In that case there is no need for any synchronization and you cannot have problems like race conditions or deadlocks.
  • Try to use the single-thread ownership pattern: This means that a block of data is owned by no more than one thread at a time. Owning the data means that no other thread is allowed to read/write to the data. When the thread is finished with the data, the data can be passed off to another thread, which now has sole and complete responsibility/ownership of the data. No synchronization is necessary in this case.
  • Use atomic types and operations when possible: Atomic types and atomic operations make it easier to write race condition and deadlock free code, because they handle synchronization automatically. If atomic types and operations are not possible in your multithreaded design, and you need shared data, you have to use a mutual exclusion mechanism to ensure proper synchronization.
  • Use locks to protect mutable shared data: If you need mutable shared data to which multiple threads can write to, and you cannot use atomic types and operations, you have to use a locking mechanism to make sure reads and writes between different threads are synchronized.
  • Release locks as soon as possible: When you need to protect your shared data with a lock, make sure you release the lock as soon as possible. While a thread is holding a lock, it is blocking other threads waiting for the same lock, possibly hurting performance.
  • Make sure to acquire multiple locks in the same order: If multiple threads need to acquire multiple locks, they must be acquired in the same order in all threads to prevent deadlocks. You can use the generic std::lock() specifying the locks in the same order to minimize the chance that you will violate lock ordering restrictions.
  • Use a multithreading aware profiler: Use a multithreading aware profiler to find performance bottlenecks in your multithreaded applications and to find out if your multiple threads are indeed utilizing all available CPU cores in your system. An example of a multithreading aware profiler is the profiler in Visual Studio 2010 Premium or Ultimate.
  • Understand the multithreading support features of your debugger: Most debuggers have at least basic support for debugging multithreaded applications. You should be able to get a list of all running threads in your application and you should be able to switch to any of those threads to inspect their call stack. You can use this, for example, to inspect deadlocks because you can see exactly what each thread is doing.
  • Use thread pools instead of creating and destroying a lot of threads dynamically: Your performance decreases if you dynamically create and destroy a lot of threads. In that case it’s better to use a thread pool to reuse existing threads.

SUMMARY

This chapter gave a brief overview of multithreaded programming using the C++11 threading library. It explained how you can use atomic types and atomic operations to operate on shared data without you having to use explicit locks. In case you cannot use these atomic types and operations, you learned how to use a mutual exclusion mechanism to ensure proper synchronization between different threads that need read/write access to shared data. You also saw how promises and futures represent a simple inter-thread communication channel; you can use futures to more easily get a result from a background thread. The chapter finished with a number of best practices for multithreaded application design.

As mentioned in the introduction, this chapter tried to touch on all the functionality provided by the C++11 threading library, but due to space constraints, it cannot go into all the details of multithreaded programming. There are books available that discuss nothing but multithreading. See Appendix B for a few references.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset