19
CONCURRENCY AND PARALLELISM

The Senior Watchdog had her own watchwords: “Show me a completely smooth operation and I’ll show you someone who’s covering mistakes. Real boats rock.”
—Frank Herbert,
Chapterhouse: Dune

Image

In programming, concurrency means two or more tasks running in a given time period. Parallelism means two or more tasks running at the same instant. Often, these terms are used interchangeably without negative consequence, because they’re so closely related. This chapter introduces the very basics of both concepts. Because concurrent and parallel programming are huge and complicated topics, thorough treatment requires an entire book. You’ll find such books in the “Further Reading” section at the end of this chapter.

In this chapter, you’ll learn about concurrent and parallel programming with futures. Next, you’ll learn how to share data safely with mutexes, condition variables, and atomics. Then the chapter illustrates how execution policies help to speed up your code but also contain hidden dangers.

Concurrent Programming

Concurrent programs have multiple threads of execution (or simply threads), which are sequences of instructions. In most runtime environments, the operating system acts as a scheduler to determine when a thread executes its next instruction. Each process can have one or more threads, which typically share resources, such as memory, with each other. Because the scheduler determines when threads execute, the programmer can’t generally rely on their ordering. In exchange, programs can execute multiple tasks in the same time period (or at the same time), which often results in serious speedups. To observe any speedup from the serial to the concurrent version, your system will need concurrent hardware, for example, a multicore processor.

This section begins with asynchronous tasks, a high-level method for making your programs concurrent. Next, you’ll learn some basic methods for coordinating between these tasks when they’re handling shared mutable state. Then you’ll survey some low-level facilities available to you in the stdlib for unique situations in which the higher-level tools don’t have the performance characteristics you require.

Asynchronous Tasks

One way to introduce concurrency into your program is by creating asynchronous tasks. An asynchronous task doesn’t immediately need a result. To launch an asynchronous task, you use the std::async function template in the <future> header.

async

When you invoke std::async, the first argument is the launch policy std::launch, which takes one of two values: std::launch::async or std::launch::deferred. If you pass launch::async, the runtime creates a new thread to launch your task. If you pass deferred, the runtime waits until you need the task’s result before executing (a pattern sometimes called lazy evaluation). This first argument is optional and defaults to async|deferred, meaning it’s up to the implementation which strategy to employ. The second argument to std::async is a function object representing the task you want to execute. There are no restrictions on the number or type of arguments the function object accepts, and it might return any type. The std::async function is a variadic template with a function parameter pack. Any additional arguments you pass beyond the function object will be used to invoke the function object when the asynchronous task launches. Also, std::async returns an object called a std::future.

The following simplified async declaration helps to summarize:

std::future<FuncReturnType> std::async([policy], func, Args&&... args);

Now that you know how to invoke async, let’s look at how to interact with its return value.

Back to the future

A future is a class template that holds the value of an asynchronous task. It has a single template parameter that corresponds with the type of the asynchronous task’s return value. For example, if you pass a function object that returns a string, async will return a future<string>. Given a future, you can interact with an asynchronous task in three ways.

First, you can query the future about its validity using the valid method. A valid future has a shared state associated with it. Asynchronous tasks have a shared state so they can communicate the results. Any future returned by async will be valid until you retrieve the asynchronous task’s return value, at which point the shared state’s lifetime ends, as Listing 19-1 illustrates.

#include <future>
#include <string>

using namespace std;

TEST_CASE("async returns valid future") {
  using namespace literals::string_literals;
  auto the_future = async([] { return "female"s; }); 
  REQUIRE(the_future.valid()); 
}

Listing 19-1: The async function returns a valid future.

You launch an asynchronous task that simply returns a string . Because async always returns a valid future, valid returns true .

If you default construct a future, it’s not associated with a shared state, so valid will return false, as Listing 19-2 illustrates.

TEST_CASE("future invalid by default") {
  future<bool> default_future; 
  REQUIRE_FALSE(default_future.valid()); 
}

Listing 19-2: A default constructed future is invalid.

You default construct a future , and valid returns false .

Second, you can obtain the value from a valid future with its get method. If the asynchronous task hasn’t yet completed, the call to get will block the currently executed thread until the result is available. Listing 19-3 illustrates how to employ get to obtain return values.

TEST_CASE("async returns the return value of the function object") {
  using namespace literals::string_literals;
  auto the_future = async([] { return "female"s; }); 
  REQUIRE(the_future.get() == "female"); 
}

Listing 19-3: The async function returns a valid future.

You use async to launch an asynchronous task and then invoke the get method on the resulting future. As expected, the result is the return value of the function object you passed into async .

If an asynchronous task throws an exception, the future will collect that exception and throw it when you invoke get, as Listing 19-4 illustrates.

TEST_CASE("get may throw ") {
  auto ghostrider = async(
                      [] { throw runtime_error{ "The pattern is full." }; }); 
  REQUIRE_THROWS_AS(ghostrider.get(), runtime_error); 
}

Listing 19-4: The get method will throw the exception thrown by an asynchronous task.

You pass a lambda to async that throws a runtime_error . When you invoke get, it throws the exception .

Third, you can check whether an asynchronous task has completed using either std::wait_for or std::wait_until. Which you choose depends on the sort of chrono object you want to pass. If you have a duration object, you’ll use wait_for. If you have a time_point object, you’ll use wait_until. Both return a std::future_status, which takes one of three values:

  • future_status::deferred signals that the asynchronous task will be evaluated lazily, so the task will execute once you call get.
  • future_status::ready indicates that the task has completed and the result is ready.
  • future_status::timeout indicates that the task isn’t ready.

If the task completes before the specified waiting period, async will return early.

Listing 19-5 illustrates how to use wait_for to check an asynchronous task’s status.

TEST_CASE("wait_for indicates whether a task is ready") {
  using namespace literals::chrono_literals;
  auto sleepy = async(launch::async, [] { this_thread::sleep_for(100ms); }); 
  const auto not_ready_yet = sleepy.wait_for(25ms); 
  REQUIRE(not_ready_yet == future_status::timeout); 
  const auto totally_ready = sleepy.wait_for(100ms); 
  REQUIRE(totally_ready == future_status::ready); 
}

Listing 19-5: Checking an asynchronous task’s status using wait_for

You first launch an asynchronous task with async, which simply waits for up to 100 milliseconds before returning . Next, you call wait_for with 25 milliseconds . Because the task is still sleeping (25 < 100), wait_for returns future_status::timeout . You call wait_for again and wait for up to another 100 milliseconds . Because the second wait_for will finish after the async task finishes, the final wait_for will return a future_status::ready .

NOTE

Technically, the assertions in Listing 19-5 aren’t guaranteed to pass. “Waiting” on page 389 introduced this_thread::sleep_for, which isn’t exact. The operating environment is responsible for scheduling threads, and it might schedule the sleeping thread later than the specified duration.

An Example with Asynchronous Tasks

Listing 19-6 contains the factorize function, which finds all of an integer’s factors.

NOTE

The factorization algorithm in Listing 19-6 is woefully inefficient but is good enough for this example. For efficient integer factorization algorithms, refer to Dixon’s algorithm, the continued fraction factorization algorithm, or the quadratic sieve.

#include <set>

template <typename T>
std::set<T> factorize(T x) {
  std::set<T> result{ 1 }; 
  for(T candidate{ 2 }; candidate <= x; candidate++) { 
    if (x % candidate == 0) { 
      result.insert(candidate); 
      x /= candidate; 
      candidate = 1; 
    }
  }
  return result;
}

Listing 19-6: A very simple integer factorization algorithm

The algorithm accepts a single argument x and begins by initializing a set containing 1 . Next, it iterates from 2 to x , checking whether modulo division with the candidate results in 0 . If it does, candidate is a factor, and you add it to the factor set . You divide x by the factor you just discovered and then restart your search by resetting the candidate to 1 .

Because integer factorization is a hard problem (and because Listing 19-6 is so inefficient), calls to factorize can take a long time relative to most of the functions you’ve encountered so far in the book. This makes it a prime candidate for asynchronous tasking. The factor_task function in Listing 19-7 uses the trusty Stopwatch from Listing 12-25 in Chapter 12 to wrap factorize and returns a nicely formatted message.

#include <set>
#include <chrono>
#include <sstream>
#include <string>

using namespace std;

struct Stopwatch {
--snip--
};

template <typename T>
set<T> factorize(T x) {
--snip--
}

string factor_task(unsigned long x) { 
  chrono::nanoseconds elapsed_ns;
  set<unsigned long long> factors;
  {
    Stopwatch stopwatch{ elapsed_ns }; 
    factors = factorize(x); 
  }
  const auto elapsed_ms =
             chrono::duration_cast<chrono::milliseconds>(elapsed_ns).count(); 
  stringstream ss;
  ss << elapsed_ms << " ms: Factoring " << x << " ( "; 
  for(auto factor : factors) ss << factor << " "; 
  ss << ")
";
  return ss.str(); 
}

Listing 19-7: A factor_task function that wraps a call to factorize and returns a nicely formatted message

Like factorize, factor_task accepts a single argument x to factorize . (For simplicity, factor_task takes an unsigned long rather than a templated argument). Next, you initialize a Stopwatch within a nested scope and then invoke factorize with x . The result is that elapsed_ns contains the number of nanoseconds elapsed while factorize executed, and factors contains all the factors of x.

Next, you construct a nicely formatted string by first converting elapsed_ns to a count in milliseconds . You write this information into a stringstream object called ss followed by the factors of x . Then you return the resulting string .

Listing 19-8 employs factor_task to factor six different numbers and record the total elapsed program time.

#include <set>
#include <array>
#include <vector>
#include <iostream>
#include <limits>
#include <chrono>
#include <sstream>
#include <string>

using namespace std;

struct Stopwatch {
--snip--
};

template <typename T>
set<T> factorize(T x) {
--snip--
}

string factor_task(unsigned long long x) {
--snip--
}

array<unsigned long long, 6> numbers{ 
        9'699'690,
        179'426'549,
        1'000'000'007,
        4'294'967'291,
        4'294'967'296,
        1'307'674'368'000
};

int main() {
  chrono::nanoseconds elapsed_ns;
  {
    Stopwatch stopwatch{ elapsed_ns }; 
    for(auto number : numbers) 
      cout << factor_task(number); 
  }
  const auto elapsed_ms =
             chrono::duration_cast<chrono::milliseconds>(elapsed_ns).count(); 
  cout << elapsed_ms << "ms: total program time
"; 
}
-----------------------------------------------------------------------
0 ms: Factoring 9699690 ( 1 2 3 5 7 11 13 17 19 )
1274 ms: Factoring 179426549 ( 1 179426549 )
6804 ms: Factoring 1000000007 ( 1 1000000007 )
29035 ms: Factoring 4294967291 ( 1 4294967291 )
0 ms: Factoring 4294967296 ( 1 2 )
0 ms: Factoring 1307674368000 ( 1 2 3 5 7 11 13 )
37115ms: total program time

Listing 19-8: A program using factor_task to factorize six different numbers

You construct an array containing six numbers of varied size and primality . Next, you initialize a Stopwatch , iterate over each element in numbers , and invoke factor_task with them . You then determine the program’s runtime in milliseconds and print it .

The output shows that some numbers, such as 9,699,690, 4,294,967,296, and 1,307,674,368,000, factor almost immediately because they contain small factors. However, the prime numbers take quite a while. Note that because the program is single threaded, the runtime for the entire program roughly equals the sum of the times taken to factorize each number.

What if you treat each factor_task as an asynchronous task? Listing 19-9 illustrates how to do this with async.

#include <set>
#include <vector>
#include <array>
#include <iostream>
#include <limits>
#include <chrono>
#include <future>
#include <sstream>
#include <string>

using namespace std;

struct Stopwatch {
--snip--
};

template <typename T>
set<T> factorize(T x) {
--snip--
}

string factor_task(unsigned long long x) {
--snip--
}

array<unsigned long long, 6> numbers{
--snip--
};

int main() {
  chrono::nanoseconds elapsed_ns;
  {
    Stopwatch stopwatch{ elapsed_ns }; 
    vector<future<string>> factor_tasks; 
    for(auto number : numbers) 
      factor_tasks.emplace_back(async(launch::async, factor_task, number)); 
    for(auto& task : factor_tasks) 
      cout << task.get(); 
  }
  const auto elapsed_ms =
             chrono::duration_cast<chrono::milliseconds>(elapsed_ns).count(); 
  cout << elapsed_ms << " ms: total program time
"; 
}
-----------------------------------------------------------------------
0 ms: Factoring 9699690 ( 1 2 3 5 7 11 13 17 19 )
1252 ms: Factoring 179426549 ( 1 179426549 )
6816 ms: Factoring 1000000007 ( 1 1000000007 )
28988 ms: Factoring 4294967291 ( 1 4294967291 )
0 ms: Factoring 4294967296 ( 1 2 )
0 ms: Factoring 1307674368000 ( 1 2 3 5 7 11 13 )
28989 ms: total program time

Listing 19-9: A program using factor_task to factorize six different numbers asynchronously

As in Listing 19-8, you initialize a Stopwatch to keep track of how long the program executes . Next, you initialize a vector called factor_tasks that contains objects of type future<string> . You iterate over numbers , invoking async with the launch::async strategy, specifying factor_task as the function object, and passing a number as the task’s argument. You invoke emplace_back on each resulting future into factor_tasks . Now that async has launched each task, you iterate over each element of factor_tasks , invoke get on each task, and write it to cout . Once you’ve received values from all the futures, you determine the number of milliseconds it took to run all tasks and write it to cout .

Thanks to concurrency, the total program time of Listing 19-9 roughly equals the maximum task execution time (28,988 ms) rather than the sum of task execution times, as in Listing 19-8 (37,115 ms).

NOTE

The times in Listing 19-8 and Listing 19-9 will vary from run to run.

Sharing and Coordinating

Concurrent programming with asynchronous tasks is simple as long as the tasks don’t require synchronization and don’t involve sharing mutable data. For example, consider a simple situation in which two threads access the same integer. One thread will increment the integer while the other decrements it. To modify a variable, each thread must read the variable’s current value, perform an addition or subtraction operation, and then write the variable to memory. Without synchronization, the two threads will perform these operations in an undefined, interleaved order. Such situations are sometimes called race conditions because the result depends on which thread executes first. Listing 19-10 illustrates just how disastrous this situation is.

#include <future>
#include <iostream>

using namespace std;

void goat_rodeo() {
  const size_t iterations{ 1'000'000 };
  int tin_cans_available{}; 

  auto eat_cans = async(launch::async, [&] { 
    for(size_t i{}; i<iterations; i++)
      tin_cans_available--; 
  });
  auto deposit_cans = async(launch::async, [&] { 
    for(size_t i{}; i<iterations; i++)
      tin_cans_available++; 
  });
  eat_cans.get(); 
  deposit_cans.get(); 
  cout << "Tin cans: " << tin_cans_available << "
"; 
}
int main() {
  goat_rodeo();
  goat_rodeo();
  goat_rodeo();
}
-----------------------------------------------------------------------
Tin cans: -609780
Tin cans: 185380
Tin cans: 993137

Listing 19-10: An illustration of how disastrous unsynchronized, mutable, shared data access can be

NOTE

You’ll get different results on each run of the program in Listing 19-10 because the program has undefined behavior.

Listing 19-10 involves defining a function called goat_rodeo, which involves a catastrophic race condition, and a main that invokes goat_rodeo three times. Within goat_rodeo, you initialize the shared data tin_cans_available . Next, you launch an asynchronous task called eat_cans in which a trip of goats decrements the shared variable tin_cans_available one million times . Next, you launch another asynchronous task called deposit_cans in which you increment tin_cans_available . After launching the two tasks, you wait for them to complete by calling get (the order doesn’t matter) . Once the tasks complete, you print the tin_cans_available variable .

Intuitively, you might expect tin_cans_available to equal zero after each task completes. After all, no matter how you order increments and decrements, if you perform them in equal number, they’ll cancel. You invoke goat_rodeo three times, and each invocation produces a wildly different result.

Table 19-1 illustrates one of the many ways the unsynchronized access in Listing 19-10 goes awry.

Table 19-1: One Possible Schedule for eat_cans and deposit_cans

eat_cans

deposit_cans

cans_available

Read cans_available (0)

0

Read cans_available (0)

0

Compute cans_available+1 (1)

0

Compute cans_available-1 (-1)

0

Write cans_available+1 (1)

1

Write cans_available-1 (-1)

-1

Table 19-1 shows how interleaving reads and writes invites disaster. In this particular incarnation, the read by deposit_cans precedes the write from eat_cans , so deposit_cans computes a stale result . If this weren’t bad enough, it clobbers the write from eat_cans when it writes .

The fundamental problem with this data race is unsynchronized access to mutable shared data. You might wonder why cans_available doesn’t update immediately whenever a thread computes cans_available+1 or cans_available-1. The answer lies in the fact that each of the rows in Table 19-1 represents a moment in time when some instruction completes execution, and the instructions for adding, subtracting, reading, and writing memory are all separate. Because the cans_available variable is shared and both threads write to it without synchronizing their actions, the instructions get interleaved in an undefined way at runtime (with catastrophic results). In the following subsections, you’ll learn three tools for dealing with such situations: mutexes, condition variables, and atomics.

Mutexes

A mutual exclusion algorithm (mutex) is a mechanism for preventing multiple threads from accessing resources simultaneously. Mutexes are synchronization primitives that support two operations: lock and unlock. When a thread needs to access shared data, it locks the mutex. This operation can block depending on the nature of the mutex and whether another thread has the lock. When a thread no longer needs access, it unlocks the mutex.

The <mutex> header exposes several mutex options:

  • std::mutex provides basic mutual exclusion.
  • std::timed_mutex provides mutual exclusion with a timeout.
  • std::recursive_mutex provides mutual exclusion that allows recursive locking by the same thread.
  • std::recursive_timed_mutex provides mutual exclusion that allows recursive locking by the same thread and a timeout.

The <shared_mutex> header provides two additional options:

  • std::shared_mutex provides shared mutual exclusion facility, which means that several threads can own the mutex at once. This option is typically used in scenarios when multiple readers can access shared data but a writer needs exclusive access.
  • std::shared_timed_mutex provides shared mutual exclusion facility and implements locking with a timeout.

NOTE

For simplicity, this chapter only covers mutex. See [thread.mutex] for more information about the other options.

The mutex class defines only a single, default constructor. When you want to obtain mutual exclusion, you call one of two methods on a mutex object: lock or try_lock. If you call lock, which accepts no arguments and returns void, the calling thread blocks until the mutex becomes available. If you call try_lock, which accepts no arguments and returns a bool, it returns immediately. If the try_lock successfully obtained mutual exclusion, it returns true and the calling thread now owns the lock. If try_lock was unsuccessful, it returns false and the calling thread doesn’t own the lock. To release a mutual exclusion lock, you simply call the method unlock, which accepts no arguments and returns void.

Listing 19-11 shows a lock-based way to solve the race condition in Listing 19-10.

#include <future>
#include <iostream>
#include <mutex>

using namespace std;

void goat_rodeo() {
  const size_t iterations{ 1'000'000 };
  int tin_cans_available{};
  mutex tin_can_mutex; 

  auto eat_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++) {
      tin_can_mutex.lock(); 
      tin_cans_available--;
      tin_can_mutex.unlock(); 
    }
  });
  auto deposit_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++) {
      tin_can_mutex.lock(); 
      tin_cans_available++;
      tin_can_mutex.unlock(); 
    }
  });
  eat_cans.get();
  deposit_cans.get();
  cout << "Tin cans: " << tin_cans_available << "
";
}

int main() {
  goat_rodeo(); 
  goat_rodeo(); 
  goat_rodeo(); 
}
-----------------------------------------------------------------------
Tin cans: 0 
Tin cans: 0 
Tin cans: 0 

Listing 19-11: Using a mutex to resolve the race condition in Listing 19-10

You add a mutex into goat_rodeo called tin_can_mutex, which provides mutual exclusion on the tin_cans_available. Inside each asynchronous task, a thread acquires a lock before modifying tin_cans_available. Once the thread is done modifying, it unlocks . Notice that the resulting number of available tin cans at the end of each run is zero , reflecting that you’ve fixed your race condition.

MUTEX IMPLEMENTATIONS

In practice, mutexes are implemented in a number of ways. Perhaps the simplest mutex is a spin lock in which a thread will execute a loop until the lock is released. This kind of lock usually minimizes the amount of time between a lock getting released by one thread and acquired by another. But it’s computationally expensive because a CPU is spending all of its time checking for lock availability when some other thread could be doing productive work. Typically, mutexes require atomic instructions, such as compare-and-swap, fetch-and-add, or test-and-set, so they can check for and acquire a lock in one operation.

Modern operating systems, like Windows, offer more efficient alternatives to spin locks. For example, mutexes based on asynchronous procedure calls allow threads to wait on a mutex and go into a wait state. Once the mutex becomes available, the operating system awakens the waiting thread and hands off ownership of the mutex. This allows other threads to do productive work on a CPU that would otherwise be occupied in a spin lock.

In general, you won’t need to worry about the details of how mutexes are implemented by your operating system unless they become a bottleneck in your program.

If you’re thinking that handling mutex locking is a perfect job for an RAII object, you’re right. Suppose you forgot to invoke unlock on a mutex, say because it threw an exception. When the next thread comes along and attempts to acquire the mutex with lock, your program will come to a screeching halt. For this reason, the stdlib provides RAII classes for handling mutexes in the <mutex> header. There you’ll find several class templates, all of which accept mutexes as constructor parameters and a template parameter corresponding to the class of the mutexes:

  • std::lock_guard is a non-copyable, non-moveable RAII wrapper that accepts a mutex object in its constructor, where it calls lock. It then calls unlock in the destructor.
  • std::scoped_lock is a deadlock avoiding RAII wrapper for multiple mutexes.
  • std::unique_lock implements a movable mutex ownership wrapper.
  • std::shared_lock implements a movable shared mutex ownership wrapper.

For brevity, this section focuses on lock_guard. Listing 19-12 shows how to refactor Listing 19-11 to use lock_guard instead of manual mutex manipulation.

#include <future>
#include <iostream>
#include <mutex>

using namespace std;

void goat_rodeo() {
  const size_t iterations{ 1'000'000 };
  int tin_cans_available{};
  mutex tin_can_mutex;
  auto eat_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++) {
      lock_guard<mutex> guard{ tin_can_mutex }; 
      tin_cans_available--;
    }
  });
  auto deposit_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++) {
      lock_guard<mutex> guard{ tin_can_mutex }; 
      tin_cans_available++;
    }
  });
  eat_cans.get();
  deposit_cans.get();
  cout << "Tin cans: " << tin_cans_available << "
";
}

int main() {
  goat_rodeo();
  goat_rodeo();
  goat_rodeo();
}
-----------------------------------------------------------------------
Tin cans: 0
Tin cans: 0
Tin cans: 0

Listing 19-12: Refactoring Listing 19-11 to use lock_guard

Rather than using lock and unlock to manage mutual exclusion, you construct a lock_guard at the beginning of each scope where you need synchronization . Because your mutual exclusion mechanism is a mutex, you specify it as your lock_guard template parameter. Listing 19-11 and Listing 19-12 have equivalent runtime behavior, including how long it takes the programs to execute. RAII objects don’t involve any additional runtime costs over programming releases and acquisitions by hand.

Unfortunately, mutual exclusion locks involve runtime costs. You might also have noticed that executing Listings 19-11 and 19-12 took substantially longer than executing Listing 19-10. The reason is that acquiring and releasing locks is a relatively expensive operation. In Listings 19-11 and 19-12, the tin_can_mutex gets acquired and then released two million times. Relative to incrementing or decrementing an integer, acquiring or releasing a lock takes substantially more time, so using a mutex to synchronize the asynchronous tasks is suboptimal. In certain situations, you can take a potentially more efficient approach by using atomics.

NOTE

For more information about asynchronous tasks and futures, refer to [futures.async].

Atomics

The word atomic comes from the Greek átomos, meaning “indivisible.” An operation is atomic if it occurs in an indivisible unit. Another thread cannot observe the operation halfway through. When you introduced locks into Listing 19-10 to produce Listing 19-11, you made the increment and decrement operations atomic because the asynchronous tasks could no longer interleave read and write operations on tin_cans_available. As you experienced running this lock-based solution, this approach is very slow because acquiring locks is expensive.

Another approach is to use the std::atomic class template in the <atomic> header, which provides primitives often used in lock-free concurrent programming. Lock-free concurrent programming solves data race issues without involving locks. On many modern architectures, CPUs support atomic instructions. Using atomics, you might be able to avoid locks by leaning on atomic hardware instructions.

This chapter doesn’t discuss std::atomic or how to devise your own lock-free solutions in detail, because it’s incredibly difficult to do correctly and is best left to experts. However, in simple situations, such as in Listing 19-10, you can employ a std::atomic to make sure that the increment or decrement operations cannot be divided. This neatly solves your data race problem.

The std::atomic template offers specializations for all fundamental types, as shown in Table 19-2.

Table 19-2: std::atomic Template Specializations for the Fundamental Types

Template specialization

Alias

std::atomic<bool>

std::atomic_bool

std::atomic<char>

std::atomic_char

std::atomic<unsigned char>

std::atomic_uchar

std::atomic<short>

std::atomic_short

std::atomic<unsigned short>

std::atomic_ushort

std::atomic<int>

std::atomic_int

std::atomic<unsigned int>

std::atomic_uint

std::atomic<long>

std::atomic_long

std::atomic<unsigned long>

std::atomic_ulong

std::atomic<long long>

std::atomic_llong

std::atomic<unsigned long long>

std::atomic_ullong

std::atomic<char16_t>

std::atomic_char16_t

std::atomic<char32_t>

std::atomic_char32_t

std::atomic<wchar_t>

std::atomic_wchar_t

Table 19-3 lists some of the supported operations for std::atomic. The std::atomic template has no copy constructor.

Table 19-3: Supported Operations for std::atomic

Operation

Description

a{}

a{ 123 }

Default constructor.

Initializes value to 123.

a.is_lock_free()

Returns true if a is lock-free. (Depends on the CPU.)

a.store(123)

Stores the value 123 into a.

a.load()

a()

Returns the stored value.

a.exchange(123)

Replaces the current value with 123 and returns the old value. This is a “read-modify-write” operation.

a.compare_exchange_weak(10, 20)

a.compare_exchange_strong(10, 20)

If the current value is 10, replaces with 20. Returns true if the value was replaced. See [atomic] for details on weak versus strong.

NOTE

Specializations for the types in <cstdint> are also available. See [atomics.syn] for details.

For the numeric types, the specializations offer additional operations, as listed in Table 19-4.

Table 19-4: Supported Operations for Numeric Specializations of a std::atomic a

Operation

Description

a.fetch_add(123)

a+=123

Replaces the current value with the result of adding the argument to the current value. Returns the value before modification. This is a “read-modify-write” operation.

a.fetch_sub(123)

a-=123

Replaces the current value with the result of subtracting the argument from the current value. Returns the value before modification. This is a “read-modify-write” operation.

a.fetch_and(123)

a&=123

Replaces the current value with the result of bitwise ANDing the argument with the current value. Returns the value before modification. This is a “read-modify-write” operation.

a.fetch_or(123)

a|=123

Replaces the current value with the result of bitwise ORing the argument with the current value. Returns the value before modification. This is a “read-modify-write” operation.

a.fetch_xor(123)

a^=123

Replaces the current value with the result of bitwise XORing the argument with the current value. Returns the value before modification. This is a “read-modify-write” operation.

a++

a--

Increments or decrements a.

Because Listing 19-12 is a prime candidate for a lock-free solution, you can replace the type of tin_cans_available with atomic_int and remove the mutex. This prevents race conditions like the one illustrated in Table 19-1. Listing 19-13 implements this refactor.

#include <future>
#include <iostream>
#include <atomic>

using namespace std;

void goat_rodeo() {
  const size_t iterations{ 1'000'000 };
  atomic_int tin_cans_available{};
  auto eat_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++)
      tin_cans_available--; 
  });
  auto deposit_cans = async(launch::async, [&] {
    for(size_t i{}; i<iterations; i++)
      tin_cans_available++; 
  });
  eat_cans.get();
  deposit_cans.get();
  cout << "Tin cans: " << tin_cans_available << "
";
}

int main() {
  goat_rodeo();
  goat_rodeo();
  goat_rodeo();
}
-----------------------------------------------------------------------
Tin cans: 0
Tin cans: 0
Tin cans: 0

Listing 19-13: Resolving the race condition using atomic_int rather than mutex

You replace int with atomic_int and remove the mutex. Because the decrement and increment operators are atomic, the race condition remains solved.

NOTE

For more information about atomics, refer to [atomics].

You also probably noticed a considerable performance boost from Listing 19-12 to 19-13. In general, using atomic operations will be much faster than acquiring a mutex.

WARNING

Unless you have a very simple concurrent access problem, such as the one in this section, you really shouldn’t try to implement lock-free solutions on your own. Refer to the Boost Lockfree library for high-quality, thoroughly tested lock-free containers. As always, you must decide whether a lock-based or lock-free implementation is optimal.

Condition Variables

A condition variable is a synchronization primitive that blocks one or more threads until notified. Another thread can notify the condition variable. After notification, the condition variable can unblock one or more threads so they can make progress. A very popular condition variable pattern involves a thread performing the following actions:

  1. Acquire some mutex shared with awaiting threads.
  2. Modify the shared state.
  3. Notify the condition variable.
  4. Release the mutex.

Any threads waiting on the condition variable then perform the following actions:

  1. Acquire the mutex.
  2. Wait on the condition variable (this releases the mutex).
  3. When another thread notifies the condition variable, this thread wakes up and can perform some work (this reacquires the mutex automatically).
  4. Release the mutex.

Due to complications arising from the complexity of modern operating systems, sometimes threads can wake up spuriously. Therefore, it’s important to verify that a condition variable was in fact signaled once a waiting thread awakens.

The stdlib provides std::condition_variable in the <condition_variable> header, which supports several operations, including those in Table 19-5. The condition_variable supports only default construction, and the copy constructor is deleted.

Table 19-5: Supported Operations of a std::condition_variable cv

Operation

Description

cv.notify_one()

If any threads are waiting on cv, this operation notifies one of them.

cv.notify_all()

If any threads are waiting on cv, this operation notifies all of them.

cv.wait(lock, [pred])

Given a lock on the mutex owned by the notifier, returns when awakened. If supplied, pred determines whether the notification is spurious (returns false) or real (returns true).

cv.wait_for(lock, [durn], [pred])

Same as cv.wait except wait_for only waits for durn. If timeout occurs and no pred is supplied, returns std::cv_status::timeout; otherwise, returns std::cv_status::no_timeout.

cv.wait_until(lock, [time], [pred])

Same as wait_for except uses a std::chrono::time_point instead of a std::chrono::duration.

For example, you can refactor Listing 19-12 so the deposit cans task completes before the eat cans task using a condition variable, as Listing 19-14 illustrates.

#include <future>
#include <iostream>
#include <mutex>
#include <condition_variable>

using namespace std;

void goat_rodeo() {
  mutex m; 
  condition_variable cv; 
  const size_t iterations{ 1'000'000 };
  int tin_cans_available{};
  auto eat_cans = async(launch::async, [&] {
    unique_lock<mutex> lock{ m }; 
    cv.wait(lock, [&] { return tin_cans_available == 1'000'000; }); 
    for(size_t i{}; i<iterations; i++)
      tin_cans_available--;
  });

  auto deposit_cans = async(launch::async, [&] {
    scoped_lock<mutex> lock{ m }; 
    for(size_t i{}; i<iterations; i++)
      tin_cans_available++;
    cv.notify_all(); 
  });
  eat_cans.get();
  deposit_cans.get();
  cout << "Tin cans: " << tin_cans_available << "
";
}

int main() {
  goat_rodeo();
  goat_rodeo();
  goat_rodeo();
}
-----------------------------------------------------------------------
Tin cans: 0
Tin cans: 0
Tin cans: 0

Listing 19-14: Using condition variables to ensure all cans are deposited before they’re eaten

You declare a mutex and a condition_variable that you’ll use to coordinate the asynchronous tasks. Within the eat cans task, you acquire a unique_lock to the mutex, which you pass into wait along with a predicate that returns true if there are cans available . This method will release the mutex and then block until two conditions are met: the condition_variable awakens this thread and one million tin cans are available (recall that you must check that all the cans are available because of spurious wakeups). Within the deposit cans task, you acquire a lock on the mutex , deposit the cans, and then notify all threads blocked on the condition_variable .

Note that, unlike with all the previous approaches, it’s impossible for tin_cans_available to be negative because the ordering of deposit cans and eat cans is guaranteed.

NOTE

For more information about condition variables, refer to [thread.condition].

Low-Level Concurrency Facilities

The stdlib’s <thread> library contains low-level facilities for concurrent programming. The std::thread class, for example, models an operating system thread. However, it’s best not to use thread directly and instead design concurrency into your programs with higher-level abstractions, like tasks. Should you require low-level thread access, [thread] offers more information.

But the <thread> library does include several useful functions for manipulating the current thread:

  • The std::this_thread::yield function accepts no arguments and returns void. The exact behavior of yield depends on the environment, but in general it provides a hint that the operating system should give other threads a chance to run. This is useful when, for example, there’s high lock contention over a particular resource and you want to help all threads get a chance at access.
  • The std::this_thread::get_id function accepts no arguments and returns an object of type std::thread::id, which is a lightweight thread that supports comparison operators and operator<<. Typically, it’s used as a key in associative containers.
  • The std::this_thread::sleep_for function accepts a std::chrono::duration argument, blocks execution on the current thread until at least the specified duration passes, and returns void.
  • The std::this_thread::sleep_until accepts a std::chrono::time_point and returns void. It is entirely analogous to sleep_for except it blocks the thread until at least the specified time_point.

When you need these functions, they’re indispensable. Otherwise, you really shouldn’t need to interact with the <thread> header.

Parallel Algorithms

Chapter 18 introduced the stdlib’s algorithms, many of which take an optional first argument called its execution policy encoded by a std::execution value. In supported environments, there are three possible values: seq, par, and par_unseq. The latter two options indicate that you want to execute the algorithm in parallel.

An Example: Parallel sort

Listing 19-15 illustrates how changing a single argument from seq to par can have a massive impact on a program’s runtime by sorting a billion numbers both ways.

#include <algorithm>
#include <vector>
#include <numeric>
#include <random>
#include <chrono>
#include <iostream>
#include <execution>

using namespace std;

// From Listing 12-25:
struct Stopwatch {
--snip--
};

vector<long> make_random_vector() { 
  vector<long> numbers(1'000'000'000);
  iota(numbers.begin(), numbers.end(), 0);
  mt19937_64 urng{ 121216 };
  shuffle(numbers.begin(), numbers.end(), urng);
  return numbers;
}

int main() {
  cout << "Constructing random vectors...";
  auto numbers_a = make_random_vector(); 
  auto numbers_b{ numbers_a }; 
  chrono::nanoseconds time_to_sort;
  cout << " " << numbers_a.size() << " elements.
";
  cout << "Sorting with execution::seq...";
  {
    Stopwatch stopwatch{ time_to_sort };
    sort(execution::seq, numbers_a.begin(), numbers_a.end()); 
  }
  cout << " took " << time_to_sort.count() / 1.0E9 << " sec.
";

  cout << "Sorting with execution::par...";
  {
    Stopwatch stopwatch{ time_to_sort };
    sort(execution::par, numbers_b.begin(), numbers_b.end()); 
  }
  cout << " took " << time_to_sort.count() / 1.0E9 << " sec.
";
}
-----------------------------------------------------------------------
Constructing random vectors... 1000000000 elements.
Sorting with execution::seq... took 150.489 sec.
Sorting with execution::par... took 17.7305 sec.

Listing 19-15: Sorting a billion numbers using std::sort with std::execution::seq versus std::execution::par. (Results are from a Windows 10 x64 machine with two Intel Xeon E5-2620 v3 processors.)

The make_random_vector function produces a vector containing a billion unique numbers. You build two copies, numbers_a and numbers_b . You sort each vector separately. In the first case, you sort with a sequential execution policy , and Stopwatch indicates that the operation took about two and a half minutes (about 150 seconds). In the second case, you sort with a parallel execution policy . In contrast, Stopwatch indicates that the operation took about 18 seconds. The sequential execution took roughly 8.5 times as long.

Parallel Algorithms Are Not Magic

Unfortunately, parallel algorithms aren’t magic. Although they work brilliantly in simple situations, such as with sort in Listing 19-15, you must be careful when using them. Any time an algorithm produces side effects beyond the target sequence, you have to think hard about race conditions. A red flag is any algorithm that passes a function object to the algorithm. If the function object has mutable state, the executing threads will have shared access and you might have a race condition. For example, consider the parallel transform invocation in Listing 19-16.

#include <algorithm>
#include <vector>
#include <iostream>
#include <numeric>
#include <execution>

int main() {
  std::vector<long> numbers{ 1'000'000 }, squares{ 1'000'000 }; 
  std::iota(numbers.begin(), numbers.end(), 0); 
  size_t n_transformed{}; 
  std::transform(std::execution::par, numbers.begin(), numbers.end(), 
                 squares.begin(), [&n_transformed] (const auto x) {
                  ++n_transformed; 
                  return x * x; 
                });
  std::cout << "n_transformed: " << n_transformed << std::endl; 
}
-----------------------------------------------------------------------
n_transformed: 187215 

Listing 19-16: A program containing a race condition due to non-atomic access to n_transformed

You begin by initializing two vector objects, numbers and squares, which contain a million elements . Next, you fill one of them with numbers using iota and initialize the variable n_transformed to 0 . You then invoke transform with a parallel execution policy, numbers as your target sequence, squares as your result sequence, and a simple lambda . The lambda increments n_transformed and returns the square of the argument x . Because multiple threads execute this lambda, access to n_transformed must be synchronized .

The previous section introduced two ways to solve this problem, locks and atomics. In this scenario, it’s probably best to just use a std::atomic_size_t as a drop-in replacement for size_t.

Summary

This chapter surveyed concurrency and parallelism at a very high level. In addition, you learned how to launch asynchronous tasks, which allow you to easily introduce multithreaded programming concepts into your code. Although introducing parallel and concurrent concepts into your programs can provide a significant performance boost, you must carefully avoid introducing race conditions that invite undefined behavior. You also learned several mechanisms for synchronizing access to mutable shared state: mutexes, condition variables, and atomics.

EXERCISES

19-1. Write your own spin lock-based mutex called SpinLock. Expose a lock, a try_lock, and an unlock method. Your class should delete the copy constructor. Try using a std::lock_guard<SpinLock> with an instance of your class.

19-2. Read about the infamous double-checked locking pattern (DCLP) and why you shouldn’t use it. (See the article by Scott Meyers and Andrei Alexandrescu mentioned in the following “Further Reading” section.) Then read about the appropriate way to ensure that a callable gets invoked exactly once using std::call_once in [thread.once.callonce].

19-3. Create a thread-safe queue class. This class must expose an interface like std::queue (see [queue.defn]). Use a std::queue internally to store elements. Use a std::mutex to synchronize access to this internal std::queue.

19-4. Add a wait_and_pop method and a std::condition_variable member to your thread-safe queue. When a user invokes wait_and_pop and the queue contains an element, it should pop the element off the queue and return it. If the queue is empty, the thread should block until an element becomes available and then proceed to pop an element.

19-5. (Optional) Read the Boost Coroutine2 documentation, especially the “Overview,” “Introduction,” and “Motivation” sections.

FURTHER READING

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset