18. Multithreading

Two significant trends of the past decade have had an enormous effect on the field of software development. First, the continued decrease in the cost of performing computations is no longer driven by increases in clock speed and transistor density, as illustrated by Figure 18.1. Rather, the cost of computation is now falling because it is economical to make hardware that has multiple CPUs.

Image
Image

FIGURE 18.1: Clock Speeds over Time
(Graph compiled by Herb Sutter. Used with permission. Original at www.gotw.ca.)

Second, computations now routinely involve enormous latency. Latency is, simply put, the amount of time required to obtain a desired result. There are two principal causes of latency. Processor-bound latency occurs when the computational task is complex; if a computation requires performing 12 billion arithmetic operations and the total processing power available is only 6 billion operations per second, at least 2 seconds of processor-bound latency will be incurred between asking for the result and obtaining it. I/O-bound latency, by contrast, is latency incurred by the need to obtain data from an external source such as a disk drive, web server, and so on. Any computation that requires fetching data from a web server physically located far from the client machine will incur latency equivalent to millions of processor cycles.

These two trends together create an enormous challenge for modern software developers. Given that machines have more computing power than ever, how are we to make effective use of that power to deliver results to the user quickly, and without compromising on the user experience? How do we avoid creating frustrating user interfaces that freeze up when a high-latency operation is triggered? Moreover, how do we go about splitting CPU-bound work among multiple processors to decrease the time required for the computation?

The standard technique for engineering software that keeps the user interface responsive and CPU utilization high is to write multithreaded programs that do multiple computations “in parallel.” Unfortunately, multithreading logic is notoriously difficult to get right; we’ll spend the next two chapters exploring what makes multithreading difficult, and learning how to use higher-level abstractions and new language features to ease that burden.

The higher-level abstractions we’ll discuss are, first, the two principal components of the Parallel Extensions library that was released with .NET 4.01—the Task Parallel Library (TPL) and Parallel LINQ (PLINQ)—and second, the Task-based Asynchronous Pattern (TAP) and its accompanying language support in C# 5.0. Although we strongly encourage you to use these higher-level abstractions, we will also cover some of the lower-level threading APIs from previous versions of the .NET runtime in this chapter. Additional multithreading patterns prior to C# 5.0 are available for download at http://IntelliTect.com/EssentialCSharp along with the chapters from Essential C# 3.0. Thus, if you want to fully understand the resources from multithreaded programming without the later features, you still have access to that material.

1. These libraries are available in .NET 3.5 by downloading the Reactive Extensions library for .NET 3.5, but this is not officially supported.

We’ll start this chapter with a few beginner topics in case you are new to multithreading. Then we’ll briefly discuss “traditional” thread manipulation without using the Parallel Extensions libraries to ensure that you have a basic understanding of thread manipulation; the following chapter goes into more details on that topic. We’ll then spend most of this chapter covering the TPL, TAP, and PLINQ, in that order.

Multithreading Basics

There is a lot of confusing jargon associated with multithreading, so let’s define a few terms.

A CPU (central processing unit) or core2 is the unit of hardware that actually executes a given program. Every machine has at least one CPU, though today multiple CPU machines are common. Many modern CPUs support simultaneous multithreading (which Intel trademarks as Hyper-Threading), a mode where a single CPU can appear as multiple “virtual” CPUs.

2. Technically we ought to say that “CPU” always refers to the physical chip and “core” may refer to a physical or virtual CPU. This distinction is unimportant for the purposes of this book, so we will use these terms interchangeably.

A process is a currently executing instance of a given program; the fundamental purpose of the operating system is to manage processes. Each process contains one or more threads. A process is represented by an instance of the Process class in the System.Diagnostics namespace.

C# programming at the level of statements and expressions is fundamentally about describing flow of control, and thus far in this book we’ve made the implicit assumption that a given program has only a single “point of control.” You can imagine the point of control as being a cursor that enters the text of your program at the Main method when you start it up, and then moves around the program as the various conditions, loops, method calls, and so on, are executed. A thread is this point of control. A thread is represented by an instance of the System.Threading.Thread class and the API for manipulating a Thread is in the same System.Threading namespace.

A single-threaded program is one in which there is only one thread in the process. A multithreaded program has two or more threads in the process.

A piece of code is said to be thread safe if it behaves correctly when used in a multithreaded program. The threading model of a piece of code is the set of requirements that the code places upon its caller in exchange for guaranteeing thread safety. (For example, the threading model of many classes is “static methods may be called from any thread but instance methods may be called only from the thread that allocated the instance.”)

A task is a unit of potentially high-latency work that produces a resultant value or desired side effect. The distinction between tasks and threads is as follows: A task represents a job that needs to be performed, whereas a thread represents the worker that does the job. A task is useful only for its side effects and is represented by an instance of the Task class. A task used to produce a value of a given type is represented by the Task<T> class, which derives from the nongeneric Task type. These can be found in the System.Threading.Tasks namespace.

A thread pool is a collection of threads, along with logic for determining how to assign work to those threads. When your program has a task to perform, it can delegate a worker thread from the pool, assign the thread to perform the task, and then de-allocate it when the work completes, thereby making it available the next time additional work is requested.


Working with System.Threading

The Parallel Extensions library is extraordinarily useful because it allows you to manipulate a higher-level abstraction, the task, rather than working directly with threads. However, you might need to work with code written before the TPL and PLINQ were available (prior to .NET 4.0), or you might have a programming problem not directly addressed by them. In this section, we briefly cover some of the basic underlying APIs for directly manipulating threads.

Asynchronous Operations with System.Threading.Thread

The operating system implements threads and provides various unmanaged APIs to create and manage those threads. The CLR wraps these unmanaged threads and exposes them in managed code via the System.Threading.Thread class, an instance of which represents a “point of control” in the program. As mentioned earlier, you can think of a thread as a “worker” that independently follows the instructions that make up your program.

Listing 18.1 provides an example. The independent point of control is represented by an instance of Thread that runs concurrently. A thread needs to know which code to run when it starts up, so its constructor takes a delegate that refers to the code that is to be executed. In this case we convert a method group, DoWork, to the appropriate delegate type, ThreadStart. We then start the thread running by calling Start(). While the new thread is running, the main thread attempts to print 10,000 hyphens to the console. We instruct the main thread to then wait for the worker thread to complete its work by calling Join(). The result is shown in Output 18.1.

LISTING 18.1: Starting a Method Using System.Threading.Thread


using System;
using System.Threading;                                                

public class RunningASeparateThread
{
  public const int Repetitions = 1000;

  public static void Main()
  {
      ThreadStart threadStart = DoWork;                                
      Thread thread = new Thread(threadStart);                         
      thread.Start();                                                  
      for(int count = 0; count < Repetitions; count++)
      {
          Console.Write('-');
      }
      thread.Join();                                                   
  }

  public static void DoWork()
  {
      for(int count = 0; count < Repetitions; count++)
      {
          Console.Write('+');
      }
  }
}


OUTPUT 18.1

++++++++++++++++++++++++++++++++----------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++-------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
-------------------------------------------------------+++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++------------------------------------------------------
------------------------------------------------------------------------
-----------------------------------------------+++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++

As you can see, the threads appear to be taking turns executing, each printing out a few hundred characters before the context switches. The two loops are running “in parallel,” rather than the first one running to completion before the second one begins, as it would if the delegate had been executed synchronously.

For code to run under the context of a different thread, you need a delegate of type ThreadStart or ParameterizedThreadStart to identify the code to execute. (The latter allows for a single parameter of type object; both are found in the System.Threading namespace.) Given a Thread instance created using the thread-start delegate constructor, you can start the thread executing with a call to thread.Start(). (Listing 18.1 creates a variable of type ThreadStart explicitly to show the delegate type in the source code. The method group DoWork could have been passed directly to the thread constructor.) The call to Thread.Start() tells the operating system to begin concurrent execution of the new thread; control on the main thread immediately returns from the call and executes the for loop in the Main() method. The threads are now independent, and neither waits for the other until the call to Join().

Thread Management

Threads include a number of methods and properties for managing their execution. Here are some of the basic ones:

As we saw in Listing 18.1, you can cause one thread to wait for another with Join(). This tells the operating system to suspend execution of the current thread until the other thread is terminated. The Join() method is overloaded to take either an int or a TimeSpan to support a maximum time to wait for thread completion before continuing execution.

By default, a new thread is a “foreground” thread; the operating system will terminate a process when all its foreground threads are complete. You can mark a thread as a “background” thread by setting the IsBackground property to true. The operating system will then allow the process to be terminated even if the background thread is still running. However, it is still a good idea to ensure that all threads are not aborted and instead to exit cleanly before the process exits; see the section on thread aborting later in this chapter for more details.

Every thread has an associated priority, which you can change by setting the Priority property to a new ThreadPriority enum value. The possible values are Lowest, BelowNormal, Normal, AboveNormal, and Highest. The operating system prefers to schedule time slices to higher-priority threads. Be careful; if you set the priorities incorrectly, you can end up with “starvation” situations where one high-priority thread prevents many low-priority threads from ever running.

If you simply want to know whether a thread is still “alive” or has finished all of its work, you can use the Boolean IsAlive property. A more informative picture of a thread’s state is accessible through the ThreadState property. The ThreadState enum values are Aborted, AbortRequested, Background, Running, Stopped, StopRequested, Suspended, SuspendRequested, Unstarted, and WaitSleepJoin. These are flags; some of these values can be combined.

There are two commonly used, and commonly abused, methods for controlling threads that deserve to be discussed in their own sections: Sleep() and Abort().

Do Not Put Threads to Sleep in Production Code

The static Thread.Sleep() method puts the current thread to sleep, essentially telling the operating system to not schedule any time slices to this thread until the given amount of time has passed. A single parameter—either a number of milliseconds or a TimeSpan—specifies how long the operating system will wait before continuing execution. While it is waiting, the operating system will, of course, schedule time slices for any other threads that might be waiting their turn to execute. This might sound like a sensible thing to do, but it is a “bad code smell” that indicates the design of the program could probably be better.

Threads are often put to sleep to try to synchronize a thread with some event in time. However, the operating system does not guarantee any level of precision in its timing. That is, if you say, “Put me to sleep for 123 milliseconds,” the operating system will put the thread to sleep for at least 123 milliseconds, and possibly much longer. The actual amount of time between the thread going to sleep and then waking up again is not deterministic and can be arbitrarily long. Do not attempt to use Thread.Sleep() as a high-precision timer, because it is not.

Worse, Thread.Sleep() is often used as a “poor man’s synchronization system.” That is, if you have some unit of asynchronous work, and the current thread cannot proceed until that work is done, you might be tempted to put the thread to sleep for much longer than you think the asynchronous work will take, in the hopes that it will be finished when the current thread wakes up. This is a bad idea: Asynchronous work, by its very nature, can take longer than you think. Use proper thread synchronization mechanisms, described in the next chapter, to synchronize threads. (We’ll give an example of this sort of abuse in Listing 18.2.)

Putting a thread to sleep is also a bad programming practice because it means that the sleeping thread is, obviously, unresponsive to attempts to run code on it. If you put the main thread of a Windows application to sleep, that thread will no longer be processing messages from the user interface, and will therefore appear to be hung.

More generally, putting a thread to sleep is a bad programming practice because the whole point of allocating an expensive resource like a thread is to get work out of that resource. You wouldn’t pay an employee to sleep, so do not pay the price of allocating an expensive thread only to put it to sleep for millions or billions of processor cycles.

That said, there are some valid uses of Thread.Sleep(). First, putting a thread to sleep with a time delay of zero tells the operating system “the current thread is politely giving up the rest of its quantum to another thread if there is one that can use it.” The polite thread will then be scheduled normally, without any further delay. Second, Thread.Sleep() is commonly used in test code to simulate a thread that is working on some high-latency operation without actually having to burn a processor doing some pointless arithmetic. Other uses in production code should be reviewed carefully to ensure that there is not a better way to obtain the desired effect.

In task-based asynchronous programming in C# 5, you can use the await operator on the result of the Task.Delay() method to introduce an asynchronous delay without blocking the current thread. See the “Timers” section in Chapter 19 for further detail.


Guidelines

AVOID calling Thread.Sleep() in production code.


Do Not Abort Threads in Production Code

The Thread object has an Abort() method that, when executed, attempts to destroy the thread. It does so by causing the runtime to throw a ThreadAbortException in the thread; this exception can be caught, but even if it is caught and ignored, it is automatically rethrown to try to ensure that the thread is, in fact, destroyed. There are many reasons why it is a very bad idea to attempt to abort a thread. Here are some of them:

The method promises only to try to abort the thread; there is no guarantee that it will succeed. For example, the runtime will not attempt to cause a ThreadAbortException if the point of control of the thread is currently inside a finally block (because critical cleanup code could be running right now and should not be interrupted) or is in unmanaged code (because doing so could corrupt the CLR itself). Rather, the CLR defers throwing the exception until control leaves the finally block or returns to managed code. But there is no guarantee that this ever happens. The thread being aborted might contain an infinite loop inside a finally block. (Ironically, the fact that the thread has an infinite loop might be the reason you are attempting to abort it in the first place.)

The aborted thread might be in critical code protected by a lock statement. (See Chapter 19 for details.) Unlike a finally block, a lock will not prevent the exception. The critical code will be interrupted halfway through by the exception, and the lock object will be automatically released, allowing other code that is waiting on the lock object to enter the critical section and observe the state of the halfway-executed code. The whole point of locking is to prevent that scenario, so aborting a thread can transform what looks like thread-safe code into dangerously incorrect code.

The CLR guarantees that its internal data structures will never be corrupted if a thread is aborted, but the BCL does not make this guarantee. Aborting a thread can leave any of your data structures or the BCL’s data structures in an arbitrarily bad state if the exception is thrown at the wrong time. Code running on other threads, or in the finally blocks of the aborted thread, can see this corrupted state and crash or behave badly.

In short, you should never abort a thread unless you are doing so as a last resort; ideally you should abort a thread only as part of a larger emergency shutdown whereby the entire AppDomain or the entire process is being destroyed. Fortunately, task-based asynchrony uses a more robust and safe cooperative cancellation pattern to terminate a “thread” whose results are no longer needed, as discussed in the next major section, “Asynchronous Tasks.”


Guidelines

AVOID aborting a thread in production code; doing so will yield unpredictable results and can destabilize a program.


Thread Pooling

As we discussed earlier, in the Beginner Topic titled “Performance Considerations,” it is possible for an excess of threads to negatively impact performance. Threads are expensive resources, thread context switching is not free, and running two jobs in simulated parallelism via time slicing can be hugely slower than running them one after the other.

To mitigate these problems, the BCL provides a thread pool. Instead of allocating threads directly, you can tell the thread pool which work you want to perform. When the work is finished, rather than the thread terminating and being destroyed, it is returned to the pool, saving on the cost of allocating a new thread when more work comes along. Listing 18.2 shows how to do the same thing as Listing 18.1, but this time with a pooled thread.

LISTING 18.2: Using ThreadPool Instead of Instantiating Threads Explicitly


using System;
using System.Threading;

public class Program
{
  public const int Repetitions = 1000;
  public static void Main()
  {
      ThreadPool.QueueUserWorkItem(DoWork, '+');                        

      for(int count = 0; count < Repetitions; count++)
      {
          Console.Write('-');
      }

      // Pause until the thread completes.
      // This is for illustrative purposes; do not
      // use Thread.Sleep for synchronization in
      // production code.
      Thread.Sleep(1000);                                               
  }
  public static void DoWork(object state)                               
  {
      for(int count = 0; count < Repetitions; count++)
      {
          Console.Write(state);
      }
  }
}


The output of Listing 18.2 is similar to Output 18.1—that is, an intermingling of periods and hyphens. If we had a lot of different jobs to perform asynchronously, this pooling technique would provide more efficient execution on single-processor and multiprocessor computers. The efficiency is achieved by reusing threads over and over, rather than reconstructing them for every asynchronous call. Unfortunately, thread pool use is not without its pitfalls: There are still performance and synchronization problems to consider when using a thread pool.

To make efficient use of processors, the thread pool assumes that all the work you schedule on the thread pool will finish in a timely manner so that the thread can be returned to the thread pool and reused by another task. The thread pool also assumes that all the work will be of a relatively short duration (that is, consuming milliseconds or seconds of processor time, not hours or days). By making this assumption, it can ensure that each processor is working full out on a task, and not inefficiently time-slicing multiple tasks, as described in the Beginner Topic on performance. The thread pool attempts to prevent excessive time slicing by ensuring that thread creation is “throttled” so that no one processor is “oversubscribed” with too many threads. Of course, that then means that consuming all threads within the pool can delay execution of queued-up work. If all the threads in the pool are consumed by long-running or I/O bound work, the queued-up work will be delayed.

Unlike Thread and Task, which are objects that you can manipulate directly, the thread pool does not provide a reference to the thread used to execute a given piece of work. This prevents the calling thread from synchronizing with, or controlling, the worker thread via the thread management functions described earlier in the chapter. In Listing 18.2 we use the “poor man’s synchronization” that we earlier discouraged; this would be a bad idea in production code because we do not actually know how long the work will take to complete.

In short, the thread pool does its job well, but that job does not include providing services to deal with long-running jobs or jobs that need to be synchronized with the main thread or with one another. What we really need to do is build a higher-level abstraction that can use threads and thread pools as an implementation detail; that abstraction is implemented by the Task Parallel Library, which is the topic of most of the rest of this chapter.

For more details on other techniques for managing worker threads that were commonly used prior to .NET 4, see the Essential C# 3.0 multithreading chapters at IntelliTect.com/EssentialCSharp.


Guidelines

DO use the thread pool to efficiently assign processor time to processor-bound tasks.

AVOID allocating a pooled worker thread to a task that is I/O bound or long-running; use TPL instead.


Begin 4.0

Asynchronous Tasks

Multithreaded programming includes the following complexities:

1. Monitoring an asynchronous operation state for completion: This includes determining when an asynchronous operation has completed, preferably not by polling the thread’s state or by blocking and waiting.

2. Thread pooling: This avoids the significant cost of starting and tearing down threads. In addition, thread pooling avoids the creation of too many threads, such that the system spends more time switching threads than running them.

3. Avoiding deadlocks: This involves preventing the occurrence of deadlocks while attempting to protect the data from simultaneous access by two different threads.

4. Providing atomicity across operations and synchronizing data access: Adding synchronization around groups of operations ensures that operations execute as a single unit and that they are appropriately interrupted by another thread. Locking is provided so that two different threads do not access the data simultaneously.

Furthermore, anytime a method is long-running, multithreaded programming will probably be required—that is, invoking the long-running method asynchronously. As developers write more multithreaded code, a common set of scenarios and programming patterns for handling those scenarios emerges.

C# 5.0 enhanced the programmability of one such pattern—TAP—by leveraging the TPL from .NET 4.0 and enhancing the C# language with new constructs to support it. This and the following section delve into the details of the TPL on its own and then the TPL with the async/await contextual keywords that simplify TAP programming. In the second half of Chapter 19, we consider several additional multithreading patterns that are important to be familiar with if the TPL and C# 5.0 are not available or you are programming against a non–TPL-based API.

From Thread to Task

Creating a thread is a relatively expensive operation, and each thread consumes a large amount (1 megabyte, by default) of virtual memory. We saw earlier in this chapter that it is potentially more efficient to use a thread pool to allocate threads when needed, assign asynchronous work to the thread, run the work to completion, and then reuse the thread for subsequent asynchronous work, rather than destroying the thread when the work is complete and creating a new one later.

In .NET Framework 4, instead of creating an operating system thread each time asynchronous work is started, the TPL creates a Task and tells the task scheduler that there is asynchronous work to perform. A task scheduler might use many different strategies to fulfill this purpose, but by default it requests a worker thread from the thread pool. The thread pool, as we’ve seen already, might decide that it is more efficient to run the task later, after some currently executing tasks have completed, or might decide to schedule the task’s worker thread to a particular processor. The thread pool determines whether it is more efficient to create an entirely new thread or to reuse an existing thread that previously finished executing.

By abstracting the concept of asynchronous work into the Task object, the TPL provides an object that represents asynchronous work and provides an object-oriented API for interacting with that work. Moreover, by providing an object that represents the unit of work, the TPL enables programmatically building up workflows by composing small tasks into larger ones, as we’ll see.

A task is an object that encapsulates work that executes asynchronously. This should sound familiar: A delegate is also an object that represents code. The difference between a task and a delegate is that delegates are synchronous and tasks are asynchronous. Executing a delegate, say, an Action, immediately transfers the point of control of the current thread to the delegate’s code; control does not return to the caller until the delegate is finished. By contrast, starting a task almost immediately returns control to the caller, no matter how much work the task has to perform. The task executes asynchronously, typically on another thread (though, as we will see later in this chapter, it is possible and even beneficial to execute tasks asynchronously with only one thread). A task essentially transforms a delegate from a synchronous to an asynchronous execution pattern.

Introducing Asynchronous Tasks

You know when a delegate is done executing on the current thread because the caller cannot do anything until the delegate is done. But how do you know when a task is done, and how do you get the result, if there is one? Consider the example of turning a synchronous delegate into an asynchronous task. We’ll do the same thing we did with threads in Listing 18.1 and thread pools in Listing 18.2, but this time with tasks: The worker thread will write periods to the console, while the main thread writes hyphens.

Starting the task obtains a new thread from the thread pool, creating a second “point of control,” and executes the delegate on that thread. The point of control on the main thread continues normally after the call to start the task (Task.Run()). The results of Listing 18.3 are almost identical to Output 18.1.

LISTING 18.3: Invoking an Asynchronous Task


using System;
using System.Threading.Tasks;                                        

public class Program
{
  public static void Main()
  {
      const int Repetitions = 10000;
      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task task = Task.Run(() =>
          {
              for(int count = 0;
                  count < Repetitions; count++)
              {
                  Console.Write('-');
              }
          });
      for(int count = 0; count < Repetitions; count++)
      {
          Console.Write('+');
      }

      // Wait until the Task completes
      task.Wait();
  }
}


The code that is to run in a new thread is defined in the delegate (of type Action in this case) passed to the Task.Run() method. This delegate (in the form of a lambda expression) prints out dashes to the console repeatedly. The loop that follows the starting of the task is almost identical, except that it displays plus signs.

Notice that following the call to Task.Run() the Action passed as the argument immediately starts executing. The Task is said to be “hot,” meaning that it has already been triggered to start executing—as opposed to a “cold” task, which needs to be explicitly started before the asynchronous work begins.

Although a Task can also be instantiated in a “cold” state via the Task constructor, doing so is generally appropriate only as an implementation detail internal to an API that returns an already running (“hot”) Task, one triggered by a call to Task.Start().

Notice that the exact state of a “hot” task is indeterminate immediately following the call to Run(). The state is instead determined by the operating system and whether it chooses to run the task’s worker thread immediately or delay it until additional resources are available. In fact, it is possible that the hot task is already finished by the time the code on the calling thread gets its turn to execute again. The call to Wait() forces the main thread to wait until all the work assigned to the task has completed executing. This is analogous to calling Join() on the worker thread, as we did in Listing 18.1.

In this scenario we have a single task, but it is also possible for many tasks to be running asynchronously. It is common to have a set of tasks where you want to wait for all of them to complete, or for any one of them to complete, before continuing execution of the current thread. The Task.WaitAll() and Task.WaitAny() methods do so.

So far, we’ve seen how a task can take an Action and run it asynchronously. But what if the work executed in the task returns a result? We can use the Task<T> type to run a Func<T> asynchronously. When executing a delegate synchronously, we know that control will not return until the result is available. When executing a Task<T> asynchronously, we can poll it from one thread to see if it is done, and fetch the result when it is.3 Listing 18.4 demonstrates how to do so in a console application. Note that this sample uses a PiCalculator.Calculate() method that we will delve into further in the section “Executing Loop Iterations in Parallel.”

3. Exercise caution when using this polling technique. When creating a task from a delegate, as we have here, the task will be scheduled to run on a worker thread from the thread pool. As a consequence, the current thread will loop until the work is complete on the worker thread. This technique works, but it might consume CPU resources unnecessarily. Such a polling technique is dangerously broken if, instead of scheduling the task to run on a worker thread, you schedule the task to execute in the future on the current thread. Since the current thread is in a loop polling the task, it will loop forever because the task will not complete until the current thread exits the loop.

LISTING 18.4: Polling a Task<T>


using System;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task<string> task =
          Task.Run<string>(
              () => PiCalculator.Calculate(100));

      foreach(
          char busySymbol in Utility.BusySymbols())
      {
          if(task.IsCompleted)
          {
              Console.Write('');
              break;
          }
          Console.Write(busySymbol);
      }

      Console.WriteLine();

      Console.WriteLine(task.Result);
      System.Diagnostics.Trace.Assert(
          task.IsCompleted);
  }
}


public class PiCalculator
{
  public static string Calculate(int digits = 100)
  {
      // ...
  }
}


public class Utility
{
  public static IEnumerable<char> BusySymbols()
  {
      string busySymbols = @"-|/-|/";
      int next = 0;
      while(true)
      {
          yield return busySymbols[next];
          next = (next + 1) % busySymbols.Length;
          yield return '';
      }
  }
}


This listing shows that the data type of the task is Task<string>. The generic type includes a Result property from which to retrieve the value returned by the Func<string> that the Task<string> executes.

Note that Listing 18.4 does not make a call to Wait(). Instead, reading from the Result property automatically causes the current thread to block until the result is available, if it isn’t already; in this case we know that it will already be complete when the result is fetched.

In addition to the IsCompleted and Result properties on Task<T>, there are several others worth noting:

The IsCompleted property is set to true when a task completes, whether it completed normally or faulted (that is, ended because it threw an exception). More detailed information on the status of a task can be obtained by reading the Status property, which returns a value of type TaskStatus. Possible values are Created, WaitingForActivation, WaitingToRun, Running, WaitingForChildrenToComplete, RanToCompletion, Canceled, and Faulted. IsCompleted is true whenever the Status is RanToCompletion, Canceled, or Faulted. Of course, if the task is running on another thread and you read the status as “Running,” the status could change to “Completed” at any time, including immediately after you read the value of the property. The same is true of many other states—even Created could potentially change if a different thread starts it. Only RanToCompletion, Canceled, and Faulted can be considered final states that can no longer be transitioned.

A task can be uniquely identified by the value of the Id property. The static Task.CurrentId property provides the identifier for the currently executing Task (that is, the task that is executing the Task.CurrentId call). These properties are especially useful when debugging.

You can use the AsyncState to associate additional data with a task. For example, imagine a List<T> whose values will be computed by various tasks. Each task could contain the index of the value in the AsyncState property. This way, when the task completes, the code can index into the list using the AsyncState (first casting it to an int).4

4. Be careful when using tasks to asynchronously mutate collections. The tasks might be running on worker threads, and the collection might not be thread safe. It is safer to fill in the collection from the main thread after the tasks are completed.

There are other useful properties that we will discuss later in this chapter, in the section on task cancellation.

Task Continuation

We’ve talked several times about the “control flow” of a program without ever saying what the most fundamental nature of control flow is: Control flow determines what happens next. When you have a simple control flow like Console.WriteLine(x.ToString());, the control flow tells you that when ToString completes normally, the next thing that will happen is a call to WriteLine with the value returned as the argument. The concept of “what happens next” is called continuation; each point in a control flow has a continuation. In our example, the continuation of ToString is WriteLine (and the continuation of WriteLine is whatever code runs in the next statement). The idea of continuation is so elementary to C# programming that most programmers don’t even think about it; it’s part of the invisible air that they breathe. The act of C# programming is the act of constructing continuation upon continuation until the control flow of the entire program is complete.

Notice that the continuation of a given piece of code in a normal C# program will be executed immediately upon the completion of that code. When ToString() returns, the point of control on the current thread immediately does a synchronous call to WriteLine. Notice also that there are actually two possible continuations of a given piece of code: the “normal” continuation and the “exceptional” continuation that will be executed if the current piece of code throws an exception.

Asynchronous method calls, such as starting a Task, add an additional dimension to the control flow. With an asynchronous Task invocation, the control flow goes immediately to the statement after the Task.Start() while at the same time, it begins executing within the body of the Task delegate. In other words, “what happens next” when asynchrony is involved is multidimensional. Unlike with exceptions where the continuation is just a different path, with asynchrony continuation is an additional, parallel path.

Asynchronous tasks also allow composition of larger tasks out of smaller tasks by describing asynchronous continuations. Just as with regular control flow, a task can have different continuations to handle error situations, and tasks can be melded together by manipulating their continuations. There are several techniques for doing so, the most explicit of which is the ContinueWith() method (see Listing 18.5 and its corresponding output, Output 18.2).

LISTING 18.5: Calling Task.ContinueWith()


using System;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      Console.WriteLine("Before");
      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task taskA =
          Task.Run( () =>
               Console.WriteLine("Starting..."))
          .ContinueWith(antecedent =>
               Console.WriteLine("Continuing A..."));
      Task taskB = taskA.ContinueWith( antecedent =>
          Console.WriteLine("Continuing B..."));
      Task taskC = taskA.ContinueWith( antecedent =>
          Console.WriteLine("Continuing C..."));
      Task.WaitAll(taskB, taskC);
      Console.WriteLine("Finished!");
  }
}


OUTPUT 18.2

Before
Starting...
Continuing A...
Continuing C...
Continuing B...
Finished!

The ContinueWith() method enables “chaining” two tasks together, such that when the predecessor task—the antecedent task—completes, the second task—the continuation task—is automatically started asynchronously. In Listing 18.5, for example, Console.WriteLine("Starting...") is the antecedent task body and Console.WriteLine("Continuing A...") is its continuation task body. The continuation task takes a Task as its argument (antecedent), thereby allowing the continuation task’s code to access the antecedent task’s completion state. When the antecedent task is completed, the continuation task starts automatically, asynchronously executing the second delegate, and passing the just-completed antecedent task as an argument to that delegate. Furthermore, since the ContinueWith() method returns a Task as well, that Task can be used as the antecedent of yet another Task, and so on, forming a continuation chain of Tasks that can be arbitrarily long.

If you call ContinueWith() twice on the same antecedent task (as Listing 18.5 shows with taskB and taskC representing continuation tasks for taskA), the antecedent task (taskA) has two continuation tasks and when the antecedent task completes, both continuation tasks will be executed asynchronously. Notice that the order of execution of the continuation tasks from a single antecedent is indeterminate at compile time. Output 18.2 happens to show taskC executing before taskB, but in a second execution of the program, the order might be reversed. However, taskA will always execute before taskB and taskC because the latter are continuation tasks of taskA and, therefore, can’t start before taskA completes. Similarly, the Console.WriteLine("Starting...") delegate will always execute to completion before taskA (Console.WriteLine("Continuing A...")) because the latter is a continuation task of the former. Furthermore, “Finished!” will always appear last because of the call to Task.WaitAll(taskB, taskC) that blocks the control flow from continuing until both taskB and taskC complete.

Many different overloads of ContinueWith() are possible, and some of them take a TaskContinuationOptions value to tweak the behavior of the continuation chain. These values are flags, so they can be combined using the logical OR operator (|). A brief description of some of the possible flag values appears in Table 18.1; see the online MSDN documentation5 for more details.

5. MSDN.NET Framework Developer Center, http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcontinuationoptions(v=vs.110).aspx.

Image
Image
Image

TABLE 18.1: List of Available TaskContinuationOptions Enums

The items denoted with a star (*) indicate under which conditions the continuation task will be executed; thus they are particularly useful for creating continuations that act like event handlers for the antecedent task’s behavior. Listing 18.6 demonstrates how an antecedent task can be given multiple continuations that execute conditionally, depending on how the antecedent task completed.

LISTING 18.6: Registering for Notifications of Task Behavior with ContinueWith()


using System;
using System.Threading.Tasks;
using System.Diagnostics;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

public class Program
{
  public static void Main()
  {
      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task<string> task =
          Task.Run<string>(
              () => PiCalculator.Calculate(10));

      Task faultedTask = task.ContinueWith(
          (antecedentTask) =>
          {
              Trace.Assert(antecedentTask.IsFaulted);
              Console.WriteLine(
                  "Task State: Faulted");
          },
          TaskContinuationOptions.OnlyOnFaulted);

      Task canceledTask = task.ContinueWith(
          (antecedentTask) =>
          {
              Trace.Assert(antecedentTask.IsCanceled);
              Console.WriteLine(
                  "Task State: Canceled");
          },
          TaskContinuationOptions.OnlyOnCanceled);

      Task completedTask = task.ContinueWith(
          (antecedentTask) =>
          {
              Trace.Assert(antecedentTask.IsCompleted);
              Console.WriteLine(
                  "Task State: Completed");
          },  TaskContinuationOptions.
                  OnlyOnRanToCompletion);

      completedTask.Wait();
  }
}


In this listing, we effectively register “listeners” for “events” on the antecedent’s task so that when the task completes normally or abnormally, the particular “listening” task will begin executing. This is a powerful capability, particularly if the original task is a “fire and forget” task—that is, a task that we start, hook up to continuation tasks, and then never refer to again.

In Listing 18.6, notice that the final Wait() call is on completedTask, not on task—the original antecedent task created with Task.Run(). Although each delegate’s antecedentTask is a reference to the parent (antecedent) task (task), from outside the delegate listeners we can effectively discard the reference to the original task. We can then rely solely on the continuation tasks that begin executing asynchronously without any need for follow-up code that checks the status of the original task.

In this case, we call completedTask.Wait() so that the main thread does not exit the program before the completed output appears (see Output 18.3).

OUTPUT 18.3

Task State: Completed.

In this case, invoking completedTask.Wait() is somewhat contrived because we know that the original task will complete successfully. However, invoking Wait() on canceledTask or faultedTask will result in an exception. Those continuation tasks run only if the antecedent task is canceled or throws an exception; given that will not happen in this program, those tasks will never be scheduled to run, and waiting for them to complete would throw an exception. The continuation options in Listing 18.3 happen to be mutually exclusive, so when the antecedent task runs to completion and the task associated with completedTask executes, the task scheduler automatically cancels the tasks associated with canceledTask and faultedTask. The canceled tasks end with their state set to Canceled. Therefore, calling Wait() (or any other invocation that would cause the current thread to wait for a task completion) on either of these tasks will throw an exception indicating that they are canceled.

A less contrived approach might be to call Task.WaitAny(completedTask, canceledTask, faultedTask), which will throw an AggregateException that then needs to be handled.

Unhandled Exception Handling on Task with AggregateException

When calling a method synchronously, we can wrap it in a try block with a catch clause to identify to the compiler which code we want to execute when an exception occurs. This does not work with an asynchronous call, however. We cannot simply wrap a try block around a call to Start() to catch an exception, because control immediately returns from the call, and control will then leave the try block, possibly long before the exception occurs on the worker thread. One solution is to wrap the body of the task delegate with a try/catch block. Exceptions thrown on and subsequently caught by the worker thread will consequently not present problems, as a try block will work normally on the worker thread. This is not the case, however, for unhandled exceptions—those that the worker thread does not catch.

Generally (starting with version 2.06 of the CLR), unhandled exceptions on any thread are treated as fatal, trigger the Windows Error Reporting dialog, and cause the application to terminate abnormally. All exceptions on all threads must be caught, and if they are not, the application is not allowed to continue to run. (For some advanced techniques for dealing with unhandled exceptions, see the upcoming Advanced Topic titled “Dealing with Unhandled Exceptions on a Thread.”) Fortunately, this is not the case, however, for unhandled exceptions in an asynchronously running task. Rather, the task scheduler inserts a “catchall” exception handler around the delegate so that if the task throws an otherwise unhandled exception, the catchall handler will catch it and record the details of the exception in the task, avoiding any trigger of the CLR automatically terminating the process.

6. In version 1.0 of the CLR, an unhandled exception on a worker thread terminated the thread but not the application. As a result, it was possible for a buggy program to have all its worker threads die, but the main thread would continue to run, even though the program was no longer doing any work. This is a confusing situation for users to be in; it is better to signal to the user that the application is in a bad state and terminate it before it can do any more harm.

As we saw in Listing 18.6, one technique for dealing with a faulted task is to explicitly create a continuation task that is the “fault handler” for that task; the task scheduler will automatically schedule the continuation when it detects that the antecedent task threw an unhandled exception. If no such handler is present, however, and Wait() (or an attempt to get the Result) executes on a faulted task, an AggregateException will be thrown (see Listing 18.7 and Output 18.4).

LISTING 18.7: Handling a Task’s Unhandled Exception


using System;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task task = Task.Run(() =>
      {
          throw new InvalidOperationException();
      });

      try
      {
          task.Wait();
      }
      catch(AggregateException exception)
      {
          exception.Handle(eachException =>
            {
                Console.WriteLine(
                    $"ERROR: { eachException.Message }");
                return true;
            });
      }
  }
}


OUTPUT 18.4

ERROR: Operation is not valid due to the current state of the object.

The aggregate exception is so-called because it may contain many exceptions collected from one or more faulted tasks. Imagine, for example, asynchronously executing ten tasks in parallel and five of them throwing exceptions. To report all five exceptions and have them handled in a single catch block, the framework uses the AggregateException as a means of collecting the exceptions and reporting them as a single exception. Furthermore, since it is unknown at compile time whether a worker task will throw one or more exceptions, an unhandled faulted task will always throw an AggregateException. Listing 18.7 and Output 18.4 demonstrate this behavior. Even though the unhandled exception thrown on the worker thread was of type InvalidOperationException, the type of the exception caught on the main thread is still an AggregateException. Also, as expected, to catch the exception requires an AggregateException catch block.

A list of the exceptions contained within an AggregateException is available from the InnerExceptions property. As a result, you can iterate over this property to examine each exception and determine the appropriate course of action. Alternatively, and as shown in Listing 18.7, you can use the AggregateException.Handle() method, specifying an expression to execute against each individual exception contained within the AggregateException. One important characteristic of the Handle() method to consider, however, is that it is a predicate. As such, the predicate should return true for any exceptions that the Handle() delegate successfully addresses. If any exception handling invocation returns false for an exception, the Handle() method will throw a new AggregateException that contains the composite list of such corresponding exceptions.

You can also observe the state of a faulted task without causing the exception to be rethrown on the current thread by simply looking at the Exception property of the task. Listing 18.8 demonstrates this approach by waiting for the completion of a fault continuation of a task7 that we know will throw an exception.

7. As we discussed earlier, waiting for a fault continuation to complete is a strange thing to do because most of the time it will never be scheduled to run in the first place. This code is provided for illustrative purposes only.

LISTING 18.8: Observing Unhandled Exceptions on a Task Using ContinueWith()


using System;
using System.Diagnostics;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      bool parentTaskFaulted = false;

      Task task = new Task(() =>
          {
              throw new InvalidOperationException();
          });
      Task continuationTask = task.ContinueWith(
          (antecedentTask) =>
          {
              parentTaskFaulted =
                  antecedentTask.IsFaulted;
          }, TaskContinuationOptions.OnlyOnFaulted);
      task.Start();
      continuationTask.Wait();
      Trace.Assert(parentTaskFaulted);
      Trace.Assert(task.IsFaulted);
      task.Exception.Handle(eachException =>
      {
          Console.WriteLine(
              $"ERROR: { eachException.Message }");
          return true;
      });
  }
}


Notice that to retrieve the unhandled exception on the original task, we use the Exception property. The result is output identical to Output 18.4.

If an exception that occurs within a task goes entirely unobserved—that is, (1) it isn’t caught from within the task; (2) the completion of the task is never observed, via Wait(), Result, or accessing the Exception property, for example; and (3) the faulted ContinueWith() is never observed—then the exception is likely to go unhandled entirely, resulting in a process-wide unhandled exception. In .NET 4.0, such a faulted task would get rethrown by the finalizer thread and likely crash the process. In contrast, in .NET 4.5, the crashing has been suppressed (although the CLR can be configured for the crashing behavior if preferred).

In either case, you can register for an unhandled task exception via the TaskScheduler.UnobservedTaskException event.

Canceling a Task

Earlier in this chapter, we described why it’s a bad idea to rudely abort a thread so as to cancel a task being performed by that thread. The TPL uses cooperative cancellation, a far more polite, robust, and reliable technique for safely canceling a task that is no longer needed. A task that supports cancellation monitors a CancellationToken object (found in the System.Threading namespace) by periodically polling it to see if a cancellation request has been issued. Listing 18.10 demonstrates both the cancellation request and the response to the request. Output 18.6 shows the results.

LISTING 18.10: Canceling a Task Using CancellationToken


using System;
using System.Threading;
using System.Threading.Tasks;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

public class Program
{
  public static void Main()
  {
      string stars =
          "*".PadRight(Console.WindowWidth-1, '*');
      Console.WriteLine("Push ENTER to exit.");

      CancellationTokenSource cancellationTokenSource=                     
          new CancellationTokenSource();                                   
      // Use Task.Factory.StartNew<string>() for                           
      // TPL prior to .NET 4.5                                             
      Task task = Task.Run(                                                
          () =>                                                            
              WritePi(cancellationTokenSource.Token),                      
                  cancellationTokenSource.Token);                          

      // Wait for the user's input
      Console.ReadLine();

      cancellationTokenSource.Cancel();
      Console.WriteLine(stars);
      task.Wait();
      Console.WriteLine();
  }

  private static void WritePi(
      CancellationToken cancellationToken)
  {
      const int batchSize = 1;
      string piSection = string.Empty;
      int i = 0;

      while(!cancellationToken.IsCancellationRequested                     
          || i == int.MaxValue)
      {
          piSection = PiCalculator.Calculate(
              batchSize, (i++) * batchSize);
          Console.Write(piSection);
      }
  }
}


OUTPUT 18.6

Push ENTER to exit.
3.141592653589793238462643383279502884197169399375105820974944592307816
40628620899862803482534211706798214808651328230664709384460955058223172
5359408128481117450
***********************************************************************
2

After starting the task, a Console.Read() blocks the main thread. At the same time, the task continues to execute, calculating the next digit of pi and printing it out. Once the user presses Enter, the execution encounters a call to CancellationTokenSource.Cancel(). In Listing 18.10, we split the call to task.Cancel() from the call to task.Wait() and print out a line of asterisks in between. The purpose of this step is to show that quite possibly an additional iteration will occur before the cancellation token is observed—hence the additional 2 in Output 18.6 following the stars. The 2 appears because the CancellationTokenSource.Cancel() doesn’t rudely stop the task from executing. The task keeps on running until it checks the token, and politely shuts down when it sees that the owner of the token is requesting cancellation of the task.

The Cancel() call effectively sets the IsCancellationRequested property on all cancellation tokens copied from CancellationTokenSource.Token. There are a few things to note, however:

A CancellationToken, not a CancellationTokenSource, is given to the asynchronous task. A CancellationToken enables polling for a cancellation request; the CancellationTokenSource provides the token and signals it when it is canceled (see Figure 18.2). By passing the CancellationToken rather than the CancellationTokenSource, we don’t have to worry about thread synchronization issues on the CancellationTokenSource because the latter remains accessible to only the original thread.

Image

FIGURE 18.2: CancellationTokenSource and CancellationToken Class Diagrams

A CancellationToken is a struct, so it is copied by value. The value returned by CancellationTokenSource.Token produces a copy of the token. For this reason CancellationToken is thread safe—it is available only from within the WritePi() method.

To monitor the IsCancellationRequested property, a copy of the CancellationToken (retrieved from CancellationTokenSource.Token) is passed to the task. In Listing 18.9, we then occasionally check the IsCancellationRequested property on the CancellationToken parameter; in this case, we check after each digit calculation. If IsCancellationRequested returns true, the while loop exits. Unlike a thread abort, which would throw an exception at essentially a random point, we exit the loop using normal control flow. We guarantee that the code is responsive to cancellation requests by polling frequently.

One other point to note about the CancellationToken is the overloaded Register() method. Via this method, you can register an action that will be invoked whenever the token is canceled. In other words, calling the Register() method subscribes to a listener delegate on the corresponding CancellationTokenSource’s Cancel().

Given that canceling before completing is the expected behavior in this program, the code in Listing 18.9 does not throw a System.Threading.Tasks.TaskCanceledException. As a consequence, task.Status will return TaskStatus.RanToCompletion—providing no indication that the work of the task was, in fact, canceled. In this example, there is no need for such an indication; however, the TPL does include the capability to do this. If the cancel call were disruptive in some way—preventing a valid result from returning, for example—throwing a TaskCanceledException (which derives from System.OperationCanceledException) would be the TPL pattern for reporting it. Instead of throwing the exception explicitly, CancellationToken includes a ThrowIfCancellationRequested() method to report the exception more easily, assuming an instance of CancellationToken is available.

If you attempt to call Wait() (or obtain the Result) on a task that threw TaskCanceledException, the behavior is the same as if any other exception had been thrown in the task: The call will throw an AggregateException. The exception is a means of communicating that the state of execution following the task is potentially incomplete. Unlike a successfully completed task in which all expected work executed successfully, a canceled task potentially has partially completed work—the state of the work is untrusted.

This example demonstrates how a long-running processor-bound operation (calculating pi almost indefinitely) can monitor for a cancellation request and respond if one occurs. There are some cases, however, when cancellation can occur without explicitly coding for it within the target task. For example, the Parallel class discussed later in the chapter offers such a behavior by default.

Begin 5.0

Task.Run(): A Shortcut and Simplification to Task.Factory.StartNew()

In .NET 4.0, the general practice for obtaining a task was to call Task.Factory.StartNew(). In .NET 4.5, a simpler calling structure was provided in Task.Run(). Like Task.Run(), Task.Factory.StartNew() could be used in C# 4.0 scenarios to invoke CPU-intensive methods that require an additional thread to be created.

Given .NET 4.5, Task.Run() should be used by default unless it proves insufficient. For example, if you need to control the task with TaskCreationOptions, if you need to specify an alternative scheduler, or if, for performance reasons, you want to pass in object state, you should consider using Task.Factory.StartNew(). Only in rare cases, where you need to separate creation from scheduling, should constructor instantiation followed by a call to Start() be considered.

Listing 18.11 provides an example of using Task.Factory.StartNew().

LISTING 18.11: Using Task.Factory.StartNew()


public Task<string> CalculatePiAsync(int digits)
{
  return Task.Factory.StartNew<string>(
      () => CalculatePi(digits));
}

private string CalculatePi(int digits)
{
    // ...
}


End 5.0

Long-Running Tasks

As we discussed earlier in the commentary on Listing 18.2, the thread pool assumes that work items will be processor-bound and relatively short-lived; it makes these assumptions to effectively throttle the number of threads created. This prevents both overallocation of expensive thread resources and oversubscription of processors that would lead to excessive context switching and time slicing.

But what if the developer knows that a task will be long-running and, therefore, will hold on to an underlying thread resource for a long time? In this case, the developer can notify the scheduler that the task is unlikely to complete its work anytime soon. This has two effects. First, it hints to the scheduler that perhaps a dedicated thread ought to be created specifically for this task, rather than attempting to use a thread from the thread pool. Second, it hints to the scheduler that perhaps this would be a good time to allow more tasks to be scheduled than there are processors to handle them. This will cause more time slicing to happen, which is a good thing. We do not want one long-running task to hog an entire processor and prevent shorter-running tasks from using it. The short-running tasks will be able to use their time slice to finish a large percentage of their work, and the long-running task is unlikely to notice the relatively slight delays caused by sharing a processor with other tasks. To accomplish this, use the TaskCreationOptions.LongRunning option when calling StartNew(), as shown in Listing 18.12. (Task.Run() does not support a TaskCreationOptions parameter.)

LISTING 18.12: Cooperatively Executing Long-Running Tasks


using System.Threading.Tasks;

// ...

      Task task = Task.Factory.StartNew(
          () =>
              WritePi(cancellationTokenSource.Token),
                  TaskCreationOptions.LongRunning);
// ...



Guidelines

DO inform the task factory that a newly created task is likely to be long-running so that it can manage it appropriately.

DO use TaskCreationOptions.LongRunning sparingly.


Tasks Are Disposable

Note that Task also supports IDisposable. This is necessary because Task may allocate a WaitHandle when waiting for it to complete; since WaitHandle supports IDisposable, Task also supports IDisposable in accordance with best practices. However, readers will note that the preceding code samples do not include a Dispose() call, nor do they rely on such a call implicitly via the using statement. The listings instead rely on an automatic WaitHandle finalizer invocation when the program exits.

This approach leads to two notable results. First, the handles live longer and hence consume more resources than they ought to. Second, the garbage collector is slightly less efficient because finalized objects survive into the next generation. However, both of these concerns are inconsequential in the Task case unless an extraordinarily large number of tasks are being finalized. Therefore, even though technically speaking all code should be disposing of tasks, you needn’t bother to do so unless performance metrics require it and it’s easy—that is, if you’re certain that Tasks have completed and no other code is using them.

The Task-Based Asynchronous Pattern

As we’ve seen so far, tasks provide a better abstraction for the manipulation of asynchronous work than threads do. Tasks are automatically scheduled to the right number of threads and large tasks can be composed by chaining together small tasks, just as large programs can be composed from multiple small methods.

However, there are some drawbacks to tasks. The principal difficulty with tasks is that they turn your program logic “inside out.” To illustrate this, we first consider a synchronous method that is blocked on an I/O-bound, high-latency operation—a web request. Next, we compare it to an asynchronous version prior to C# 5.0 and the Task-based Asynchronous Pattern (TAP). Lastly, we revise the same example by using C# 5.0 (and higher) and the async/await contextual keywords.

Synchronously Invoking a High-Latency Operation

In Listing 18.13, the code uses a WebRequest to download a web page and display its size. If the operation fails, an exception is thrown.

LISTING 18.13: A Synchronous Web Request


using System;
using System.IO;
using System.Net;
using System.Linq;

public class Program
{
  public static void Main(string[] args)
  {
      string url = "http://www.IntelliTect.com";
      if(args.Length > 0)
      {
          url = args[0];
      }

      try
      {
          Console.Write(url);
          WebRequest webRequest =
              WebRequest.Create(url);

          WebResponse response =
              webRequest.GetResponse();

          Console.Write(".....");

          using(StreamReader reader =
              new StreamReader(
                  response.GetResponseStream()))
          {
              string text =
                  reader.ReadToEnd();
              Console.WriteLine(
                  FormatBytes(text.Length));
          }
      }
      catch(WebException)
      {
          // ...
      }
      catch(IOException )
      {
          // ...
      }
      catch(NotSupportedException )
      {
          // ...
      }
  }

  static public string FormatBytes(long bytes)
  {
      string[] magnitudes =
          new string[] { "GB", "MB", "KB", "Bytes" };
      long max =
          (long)Math.Pow(1024, magnitudes.Length);

      return string.Format("{1:##.##} {0}",
          magnitudes.FirstOrDefault(
              magnitude =>
                  bytes > (max /= 1024)) ?? "0 Bytes",
              (decimal)bytes / (decimal)max);
  }
}


The logic in Listing 18.13 is relatively straightforward—using common C# idioms like try/catch blocks and return statements to describe the control flow. Given a WebRequest, this code calls GetResponse() to download the page. To gain stream access to the page, it calls GetResponseStream() and assigns the result to a StreamReader. Finally, it reads to the end of the stream with ReadToEnd() to determine the size of the page and then print it out to the screen.

The problem with this approach is, of course, that the calling thread is blocked until the I/O operation completes; this is wasting a thread that could be doing useful work while the asynchronous operation executes. For this reason, we cannot, for example, execute any other code, such as code that indicates progress.

Asynchronously Invoking a High-Latency Operation Using the TPL

To address this problem, Listing 18.14 takes a similar approach but instead uses task-based asynchrony with the TPL.

LISTING 18.14: An Asynchronous Web Request


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;

public class Program
{
  public static void Main(string[] args)
  {
      string url = "http://www.IntelliTect.com";
      if(args.Length > 0)
      {
          url = args[0];
      }

      Console.Write(url);

      Task task = WriteWebRequestSizeAsync(url);

      try
      {
          while(!task.Wait(100))
          {
              Console.Write(".");
          }
      }
      catch(AggregateException exception)
      {
          exception = exception.Flatten();
          try
          {
              exception.Handle(innerException =>
              {
                  // Rethrowing rather than using
                  // if condition on the type.
                  ExceptionDispatchInfo.Capture(
                      exception.InnerException)
                      .Throw();
                  return true;
              });
          }
          catch(WebException)
          {
              // ...
          }
          catch(IOException )
          {
              // ...
          }
          catch(NotSupportedException )
          {
              // ...
          }
      }
  }


  private static Task WriteWebRequestSizeAsync(
      string url)
  {
      StreamReader reader = null;
      WebRequest webRequest =
           WebRequest.Create(url);

      Task task =
          webRequest.GetResponseAsync()
      .ContinueWith( antecedent =>
      {
          WebResponse response =
             antecedent.Result;

          reader =
              new StreamReader(
                  response.GetResponseStream());
          return reader.ReadToEndAsync();
      })
      .Unwrap()
      .ContinueWith(antecedent =>
      {
          if(reader != null) reader.Dispose();
          string text = antecedent.Result;
          Console.WriteLine(
              FormatBytes(text.Length));
      });

      return task;
  }

  // ...
}


Unlike Listing 18.13, when Listing 18.14 executes, it prints periods to the console while the page is downloading. The result is that instead of simply printing four periods (“....”) to the console, Listing 18.14 is able to continuously print periods for as long as it takes to download the file, read it from the stream, and determine its size.

Unfortunately, this asynchrony comes at the cost of complexity. Interspersed throughout the code is TPL-related code that interrupts the flow. Rather than simply following the WebRequest.GetResponseAsync() call with steps to retrieve the StreamReader and call ReadToEndAsync(), the asynchronous version of the code requires ContinueWith() statements. The first ContinueWith() statement identifies what to execute after the WebRequest.GetResponseAsync(). Notice that the return statement in the first ContinueWith() expression returns StreamReader.ReadToEndAsync(), which returns another Task.

Without the Unwrap() call, therefore, the antecedent in the second ContinueWith() statement is a Task<Task<string>>, which alone indicates the complexity. As a result, it is necessary to call Result twice—once on the antecedent directly and a second time on the Task<string>.Result property antecedent.Result returned, with the latter blocking subsequent execution until the ReadToEnd() operation completes. To avoid the Task<Task<TResult>> structure, we preface the call to ContinueWith() with a call to Unwrap(), thereby shedding the outer Task and appropriately handling any errors or cancellation requests.

The complexity doesn’t stop with Tasks and ContinueWith(), however: The exception handling adds an entirely new dimension to the complexity. As mentioned earlier, the TPL generally throws an AggregateException exception because of the possibility that an asynchronous operation could encounter multiple exceptions. However, because we are calling the Result property from within ContinueWith() blocks, it is possible that inside the worker thread we might also throw an AggregateException.

As you learned earlier in the chapter, there are multiple ways to handle these exceptions:

1. We can add continuation tasks to all *Async methods that return a task along with each ContinueWith() method call. However, doing so would prevent us from using the fluid API in which the ContinueWith() statements are chained together one after the other. Furthermore, this would force us to deeply embed error-handling logic into the control flow rather than simply relying on exception handling.

2. We can surround each delegate body with a try/catch block so that no exceptions go unhandled from the task. Unfortunately, this approach is less than ideal as well. First, some exceptions (like those triggered when calling antecedent.Result) will throw an AggregateException from which we will need to unwrap the InnerException(s) to handle them individually. Upon unwrapping them, we either rethrow them so as to catch a specific type or conditionally check for the type of the exception separately from any other catch blocks (even catch blocks for the same type). Second, each delegate body will require its own separate try/catch handler, even if some of the exception types between blocks are the same. Third, Main’s call to task.Wait() could still throw an exception because WebRequest.GetResponseAsync() could potentially throw an exception, and there is no way to surround it with a try/catch block. Therefore, there is no way to eliminate the try/catch block in Main that surrounds task.Wait().

3. We can ignore all exception handling from within WriteWebRequestSizeAsync() and instead rely solely on the try/catch block that surrounds Main’s task.Wait(). Given that we know the exception will be an AggregateException, we can have a catch for only that exception. Within the catch block, we can handle the exception by calling Aggregate-Exception.Handle() and throwing each exception using the Exception-Dispatch-Info object so as not to lose the original stack trace. These exceptions are then caught by the expected exception handles and addressed accordingly. Notice, however, that before handling the Aggregate-Exception’s InnerExceptions, we first call AggregateException.Flatten(). This step addresses the issue of an AggregateException wrapping inner exceptions that are also of type AggregateException (and so on). By calling Flatten(), we ensure that all exceptions are moved to the first level and all contained AggregateExceptions are removed.

As shown in Listing 18.14, option 3 is probably the preferred approach because it keeps the exception handling outside the control flow for the most part. This doesn’t eliminate the error-handling complexity entirely; rather, it simply minimizes the occasions on which it is interspersed within the regular control flow.

Although the asynchronous version in Listing 18.14 has almost the same logical control flow as the synchronous version in Listing 18.13, both versions attempt to download a resource from a server and, if the download succeeds, the result is returned. (If the download fails, the exception’s type is interrogated to determine the right course of action.) However, it is clear that the asynchronous version of Listing 18.14 is significantly more difficult to read, understand, and change than the corresponding synchronous version in Listing 18.13. Unlike the synchronous version, which uses standard control flow statements, the asynchronous version is forced to create multiple lambda expressions to express the continuation logic in the form of delegates.

And this is a fairly simple example! Imagine what the asynchronous code would look like if, for example, the synchronous code contained a loop that retried the operation three times if it failed, if it tried to contact multiple different servers, if it took a collection of resources rather than a single one, or if all of these possible features occurred together. Adding those features to the synchronous version would be straightforward, but it is not at all clear how to do so in the asynchronous version. Rewriting synchronous methods into asynchronous methods by explicitly specifying the continuation of each task gets very complicated very quickly even if the synchronous continuations are what appear to be very simple control flows.

The Task-Based Asynchronous Pattern with async and await

Fortunately, it turns out that it is actually not too difficult to write a computer program that does these complex code transformations for you. The designers of the C# language realized this need would crop up, and they have added such a capability to the C# 5.0 compiler. Starting with C# 5.0, you can rewrite the synchronous program given earlier into an asynchronous program much more easily using the Task-based Asynchronous Pattern (TAP); the C# compiler then does the tedious work of transforming your method into a series of task continuations. Listing 18.15 shows how to rewrite Listing 18.13 into an asynchronous method without the major structural changes of Listing 18.14.

LISTING 18.15: An Asynchronous Web Request Using the Task-Based Asynchronous Pattern


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;

public class Program
{
  private static async Task WriteWebRequestSizeAsync(
      string url)
  {
      try
      {
          WebRequest webRequest =
              WebRequest.Create(url);
          WebResponse response =
              await webRequest.GetResponseAsync();
          using(StreamReader reader =
              new StreamReader(
                  response.GetResponseStream()))
          {
              string text =
                  await reader.ReadToEndAsync();
              Console.WriteLine(
                  FormatBytes(text.Length));
          }
      }
      catch(WebException)
      {
          // ...
      }
      catch(IOException )
      {
          // ...
      }
      catch(NotSupportedException )
      {
          // ...
      }
  }

  public static void Main(string[] args)
  {
      string url = "http://www.IntelliTect.com";
      if(args.Length > 0)
      {
          url = args[0];
      }

      Console.Write(url);

      Task task = WriteWebRequestSizeAsync(url);                         

      while(!task.Wait(100))
      {
          Console.Write(".");
      }
  }

  // ...

}


Notice the small differences between Listing 18.13 and Listing 18.15. First, we refactor the body of the web request functionality into a new method (WriteWebRequestSizeAsync()) and add the new contextual keyword async to the method’s declaration. A method decorated with this keyword must return Task, Task<T>, or void. In this case, since there is no data returned by the body of the method but we still want the capability of returning information about the asynchronous activity to the caller, WriteWebRequestSizeAsync() returns Task. Notice the method name suffix is Async; this is not necessary, but it is conventional to mark asynchronous methods this way so as to identify their asynchronous behavior. Finally, everywhere there is an asynchronous equivalent for the synchronous method, we insert the new contextual keyword await before invoking the asynchronous version.

Notice that nothing else changes between Listings 18.13 and 18.15. The asynchronous method versions seemingly still return the same data types as before—despite the fact that each actually returns a Task<T>. This is not via some magical implicit cast, either. GetResponseAsync() is declared as follows:

public virtual Task<WebResponse> GetResponseAsync() { ... }

At the call site, we assign the return value to WebResponse:

WebResponse response = await webRequest.GetResponseAsync()

The async contextual keyword plays a critical role by signaling to the compiler that it should rewrite the expression into a state machine that represents all the control flow we saw in Listing 18.14 (and more).

Also notice the try/catch logic improvements over Listing 18.14 that appear in Listing 18.15. In Listing 18.15, there is no need to catch an AggregateException. The catch clause continues to catch the exact type of exception expected, with no unwrapping of the inner exceptions required. Rather, the compiler’s rewrite seemingly ensures that the AggregateException in the task is processed just as if it was a normal, synchronously thrown exception. In reality, the AggregateException (and its internal exception collection) continue to operate as expected only when you await the task, at which point the rewrite pulls the first exception from the collection and throws it. The aim is to make the asynchronous code look as much as possible like the synchronous code.

To better understand the control flow, Table 18.2 shows each task in a separate column along with the execution that occurs on each task.

Image

TABLE 18.2: Control Flow within Each Task

There are a couple of important misconceptions that the table helps to dismiss:

Misconception #1: A method decorated with the async keyword is automatically executed on a worker thread when called. This is absolutely not true; the method is executed normally, on the calling thread, and if the implementation doesn’t await any incomplete awaitable tasks, it will complete synchronously on the same thread. It’s the method’s implementation that is responsible for starting any asynchronous work. Just using the async keyword does not change where the method’s code executes. Also, there is nothing unusual about a call to an async method from the caller’s perspective; it is a method typed as returning a Task, it is called normally, and it returns an object of its return type normally.

Misconception #2: The await keyword causes the current thread to block until the awaited task is completed. That is also absolutely not true. If you want the current thread to block until the task completes, call the Wait() method, as we have already described. In fact, the Main thread does so repeatedly while waiting for the other tasks to complete. However, the while(!task.Wait(100)) { } call executes concurrently with the other tasks—not synchronously. The await keyword evaluates the expression that follows it, which is usually of type Task or Task<T>, adds a continuation to the resultant task, and then immediately returns control to the caller. The creation of the task has started asynchronous work; the await keyword means that the developer wishes the caller of this method to continue executing its work on this thread while the asynchronous work is processed. At some point after that asynchronous work is complete, execution will resume at the point of control following the await expression.

In fact, the principal reasons why the async keyword exists in the first place are twofold. First, it makes it crystal clear to the reader of the code that the method that follows will be automatically rewritten by the compiler. Second, it informs the compiler that usages of the await contextual keyword in the method are to be treated as asynchronous control flow, and not as an ordinary identifier.

Asynchronous Lambdas

Just as a lambda expression converted to a delegate can be used as a concise syntax for declaring a normal method, so C# 5.0 (and later) also allows lambdas containing await expressions to be converted to delegates. To do so, just precede the lambda expression with the async keyword. In Listing 18.16, we rewrite the GetResourceAsync() method from Listing 18.15 from an async method to an async lambda.

LISTING 18.16: An Asynchronous Client-Server Interaction As a Lambda Expression


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;

public class Program
{

  public static void Main(string[] args)
  {
      string url = "http://www.IntelliTect.com";
      if(args.Length > 0)
      {
          url = args[0];
      }

      Console.Write(url);

      Func<string, Task> writeWebRequestSizeAsync =
          async (string webRequestUrl) =>
          {
              // Error handling ommitted for
              // elucidation.
              WebRequest webRequest =
                 WebRequest.Create(url);

              WebResponse response =
                  await webRequest.GetResponseAsync();
              using(StreamReader reader =
                  new StreamReader(
                      response.GetResponseStream()))
              {
                  string text =
                      (await reader.ReadToEndAsync());
                  Console.WriteLine(
                      FormatBytes(text.Length));
              }
          };

      Task task = writeWebRequestSizeAsync(url);                        

      while (!task.Wait(100))
      {
          Console.Write(".");
      }
  }

  // ...

}


Note that an async lambda expression has the exact same restrictions as the named async method:

An async lambda expression must be converted to a delegate whose return type is void, Task, or Task<T>.

The lambda is rewritten so that return statements become signals that the task returned by the lambda has completed with the given result.

Execution within the lambda expression occurs synchronously until the first await on an incomplete awaitable is executed.

All instructions following the await will execute as continuations on the return from the invoked asynchronous method (or, if the awaitable is already complete, will be simply executed synchronously rather than as continuations).

An async lambda expression can be invoked with an await (not shown in Listing 18.16).

Wrapping your head around precisely what is happening in an async method can be difficult, but it is far less difficult than trying to figure out what asynchronous code written with explicit continuations in lambdas is doing. The key points to remember are as follows:

When control reaches an await keyword, the expression that follows it produces a task.9 Control then returns to the caller so that it can continue to do work while the task completes asynchronously.

9. Technically, it is an awaitable type as described in the Advanced Topic titled “Awaiting Non-Task<T> Values.”

Some time after the task completes, control resumes at the point following the await. If the awaited task produces a result, that result is then obtained. If it faulted, the exception is thrown.

A return statement in an async method causes the task associated with the method invocation to become completed; if the return statement has a value, the value returned becomes the result of the task.

Task Schedulers and the Synchronization Context

On occasion, this chapter has mentioned the task scheduler and its role in determining how to assign work to threads efficiently. Programmatically, the task scheduler is an instance of the System.Threading.Tasks.TaskScheduler. This class, by default, uses the thread pool to schedule tasks appropriately, determining how to safely and efficiently execute them—when to reuse them, dispose them, or create additional ones.

It is possible to create your own task scheduler that makes different choices about how to schedule tasks by deriving a new type from the TaskScheduler class. You can obtain a TaskScheduler that will schedule a task to the current thread (or, more precisely, to the synchronization context associated with the current thread), rather than to a different worker thread, by using the static FromCurrentSynchronizationContext() method.10

10. For an example, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at IntelliTect.com/EssentialCSharp.

The synchronization context under which a task executes and, in turn, the continuation task(s) execute(s), is important because the awaiting task consults the synchronization context (assuming there is one) so that a task can execute efficiently and safely. Listing 18.20 (along with Output 18.7) is similar to Listing 18.5 except that it also prints out the thread ID when it displays the message.

LISTING 18.20: Calling Task.ContinueWith()


using System;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
  public static void Main()
  {
      DisplayStatus("Before");
      Task taskA =
          Task.Run(() =>
               DisplayStatus("Starting..."))
          .ContinueWith( antecedent =>
               DisplayStatus("Continuing A..."));
      Task taskB = taskA.ContinueWith( antecedent =>
          DisplayStatus("Continuing B..."));
      Task taskC = taskA.ContinueWith( antecedent =>
          DisplayStatus("Continuing C..."));

      Task.WaitAll(taskB, taskC);
      DisplayStatus("Finished!");
  }

  private static void DisplayStatus(string message)
  {
      string text = string.Format(
              $@"{ Thread.CurrentThread.ManagedThreadId
                  }: { message }");
      Console.WriteLine(text);
  }
}


OUTPUT 18.7

1: Before
3: Starting...
4: Continuing A...
3: Continuing C...
4: Continuing B...
1: Finished!

What is noteworthy about this output is that the thread ID changes sometimes and gets repeated at other times. In this kind of plain console application, the synchronization context (accessible from SynchronizationContext.Current) is null—the default synchronization context causes the thread pool to handle thread allocation instead. This explains why the thread ID changes between tasks: Sometimes the thread pool determines that it is more efficient to use a new thread, and sometimes it decides that the best course of action is to reuse an existing thread.

Fortunately, the synchronization context gets set automatically for types of applications where that is critical. For example, if the code creating tasks is running in a thread created by ASP.NET, the thread will have a synchronization context of type AspNetSynchronizationContext associated with it. In contrast, if your code is running in a thread created in a Windows UI application (WPF or Windows Forms), the thread will have an instance of DispatcherSynchronizationContext associated with it. (For console applications, there is no synchronization context by default.) Since the TPL consults the synchronization context and the synchronization context varies depending on the circumstances of the execution, the TPL is able to schedule continuations executing in contexts that are both efficient and safe.

To modify the code so that the synchronization context is leveraged instead, you must (1) set the synchronization context and (2) use async/await so that the synchronization context is consulted.11

11. For a simple example of how to set the synchronization context of a thread, and how to use a task scheduler to schedule a task to that thread, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at IntelliTect.com/EssentialCSharp.

It is possible to define custom synchronization contexts, and to work with existing synchronization contexts to improve their performance in some specific scenarios. However, describing how to do so is beyond the scope of this text.

async/await with the Windows UI

One place where synchronization is especially important is in the context of UI and Web programming. With the Windows UI, for example, a message pump processes messages such as mouse click and move events. Furthermore, the UI is single-threaded, so that interaction with any UI components (a text box, for example) must always occur from the single UI thread. One of the key advantages of the async/await pattern is that it leverages the synchronization context to ensure that continuation work—work that appears after the await statement—will always execute on the same synchronization task that invoked the await statement. This approach is of significant value because it eliminates the need to explicitly switch back to the UI thread to update a control.

To better appreciate this benefit, consider the example of a UI event for a button click in WPF, as shown in Listing 18.21.

LISTING 18.21: Synchronous High-Latency Invocation in WPF


using System;

private void PingButton_Click(
  object sender, RoutedEventArgs e)
{
  StatusLabel.Content = "Pinging...";
  UpdateLayout();
  Ping ping = new Ping();
  PingReply pingReply =
      ping.Send("www.IntelliTect.com");
  StatusLabel.Text = pingReply.Status.ToString();
}


Given that StatusLabel is a WPF System.Windows.Controls.TextBlock control and we have updated the Content property twice within the PingButton_Click() event subscriber, it would be a reasonable assumption that first “Pinging...” would be displayed until Ping.Send() returned, and then the label would be updated with the status of the Send() reply. As those experienced with Windows UI frameworks well know, this is not, in fact, what happens. Rather, a message is posted to the Windows message pump to update the content with “Pinging...” but, because the UI thread is busy executing the PingButton_Click() method, the Windows message pump is not processed. By the time the UI thread frees up to look at the Windows message pump, a second Text property update request has been queued and the only message that the user is able to observe is the final status.

To fix this problem using TAP, we change the code highlighted in Listing 18.22.

LISTING 18.22: Synchronous High-Latency Invocation in WPF Using await


using System;
async private void PingButton_Click(
  object sender, RoutedEventArgs e)
{
  StatusLabel.Content = "Pinging...";
  UpdateLayout();
  Ping ping = new Ping();
  PingReply pingReply =
      await ping.SendPingAsync("www.IntelliTect.com");                  
  StatusLabel.Text = pingReply.Status.ToString();
}


This change offers two advantages. First, the asynchronous nature of the ping call frees up the caller thread to return to the Windows message pump caller’s synchronization context, and processes the update to StatusLabel.Content so that “Pinging...” appears to the user. Second, when awaiting ping.SendTaskAsync() completes, it will always execute on the same synchronization context as the caller. Also, because the synchronization context is specifically appropriate for Windows UI, it is single-threaded and, therefore, the return will always be to the same thread—the UI thread. In other words, rather than immediately executing the continuation task, the TPL consults the synchronization context, which instead posts a message regarding the continuation work to the message pump. Next, because the UI thread monitors the message pump, upon picking up the continuation work message, it invokes the code following the await call. (As a result, the invocation of the continuation code is on the same thread as the caller that processed the message pump.)

There is a key code readability feature built into the TAP language pattern. Notice in Listing 18.22 that the call to return pingReply.Status appears to flow naturally after the await, providing a clear indication that it will execute immediately following the previous line. However, writing what really happens from scratch would be far less understandable for multiple reasons.

await Operators

There is no limitation on the number of times that await can be placed into a single method. In fact, such statements are not limited to appearing one after another. Rather, await statements can be placed into loops and processed consecutively one after the other, thereby following a natural control flow the way code appears. Consider the example in Listing 18.23.

LISTING 18.23: Iterating over an Await Operation


async private void PingButton_Click(
  object sender, RoutedEventArgs e)
{
  List<string> urls = new List<string>()
      {
          "www.habitat-spokane.org",
          "www.partnersintl.org",
          "www.iassist.org",
          "www.fh.org",
          "www.worldvision.org"
      };
  IPStatus status;

  Func<string, Task<IPStatus>> func =
      async (localUrl) =>
      {
          Ping ping = new Ping();
          PingReply pingReply =
              await ping.SendPingAsync(localUrl);
          return pingReply.Status;
      };

  StatusLabel.Content = "Pinging...";

  foreach(string url in urls)                                             
  {                                                                       
      status = await func(url);                                           
      StatusLabel.Text =                                                  
          $@"{ url }: { status.ToString() } ({                            
              Thread.CurrentThread.ManagedThreadId })";                   
  }                                                                       
}


Regardless of whether the await statements occur within an iteration or as separate entries, they will execute serially, one after the other and in the same order they were invoked from the calling thread. The underlying implementation is to string them together in the semantic equivalent of Task.ContinueWith(), except that all of the code between the await operators will execute in the caller’s synchronization context.

Support for TAP from the UI is one of the key scenarios that led to TAP’s creation. A second scenario takes place on the server, when a request comes in from a client to query an entire table’s worth of data from the database. As querying the data could be time-consuming, a new thread should be created rather than consuming one from the limited number allocated to the thread pool. The problem with this approach is that the work to query from the database is executing entirely on another machine. There is no reason to block an entire thread given that the thread is generally not active anyway.

To summarize, TAP was created to address these key problems:

There is a need to allow long-running activities to occur without blocking the UI thread.

Creating a new thread (or Task) for non–CPU-intensive work is relatively expensive when you consider that all the thread is doing is waiting for the activity to complete.

When the activity completes (either by using a new thread or via a callback), it is frequently necessary to make a thread synchronization context switch back to the original caller that initiated the activity.

TAP provides a new pattern that works for both CPU-intensive and non–CPU-intensive asynchronous invocations—one that all .NET languages support explicitly.

Executing Loop Iterations in Parallel

Consider the following for loop statement and associated code (see Listing 18.24 and the corresponding output, Output 18.8). The Listing calls a method for calculating a section of the decimal expansion of pi, where the parameters are the number of digits and the digit to start with. The actual calculation is not germane to the discussion. What is interesting about this calculation is that it is embarrassingly parallelizable; that is, it is almost embarrassing how easy it is to split up a large task—say, computing 1 million decimal digits of pi—into any desired number of smaller tasks that can all be run in parallel. These types of computations are the easiest ones to speed up by adding parallelism.

LISTING 18.24: For Loop Synchronously Calculating Pi in Sections


using System;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

class Program
{
  const int TotalDigits = 100;
  const int BatchSize = 10;

  static void Main()
  {
      string pi = null;
      const int iterations = TotalDigits / BatchSize;
      for(int i = 0; i < iterations; i++)
      {
          pi += PiCalculator.Calculate(
              BatchSize, i * BatchSize);
      }

      Console.WriteLine(pi);
  }
}


using System;

class PiCalculator
{
  public static string Calculate(
      int digits, int startingAt)
  {
     // ...
  }

   // ...
}


OUTPUT 18.8

>3.14159265358979323846264338327950288419716939937510582097494459230781
64062862089986280348253421170679821480865132823066470938446095505822317
25359408128481117450284102701938521105559644622948954930381964428810975
66593344612847564823378678316527120190914564856692346034861045432664821
33936072602491412737245870066063155881748815209209628292540917153643678
92590360011330530548820466521384146951941511609433057270365759591953092
18611738193261179310511854807446237996274956735188575272489122793818301
194912


The for loop executes each iteration synchronously and sequentially. However, because the pi calculation algorithm splits the pi calculation into independent pieces, it is not necessary to compute the pieces sequentially just as long as the results are appended in the right order. Imagine what would happen if you could have all the iterations of this loop run concurrently: Each processor could take a single iteration and execute it in parallel with other processors executing other iterations. Given the simultaneous execution of iterations, we could decrease the execution time more and more based on the number of processors.

The TPL provides a convenient method, Parallel.For(), that does precisely that. Listing 18.25 shows how to modify the sequential, single-threaded program in Listing 18.24 to use the helper method.

LISTING 18.25: For Loop Calculating Pi in Sections in Parallel


using System;
using System.Threading.Tasks;                                          
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

// ...

class Program
{
  static void Main()
  {
        string pi = null;
        const int iterations = TotalDigits / BatchSize;
        string[] sections = new string[iterations];
        Parallel.For(0, iterations, (i) =>                               
        {                                                                
            sections[i] = PiCalculator.Calculate(                        
                BatchSize, i * BatchSize);                               
        });                                                              
        pi = string.Join("", sections);
        Console.WriteLine(pi);
}


The output for Listing 18.25 is identical to Output 18.8; however, the execution time is significantly faster if you have multiple CPUs (and possibly slower if you do not). The Parallel.For() API is designed to look similar to a standard for loop. The first parameter is the fromInclusive value, the second is the toExclusive value, and the last is the Action<int> to perform as the loop body. When using an expression lambda for the action, the code looks similar to a for loop statement except that now each iteration may execute in parallel. As with the for loop, the call to Parallel.For() will not complete until all iterations are complete. In other words, by the time execution reaches the string.Join() statement, all sections of pi will have been calculated.

Note that the code for combining the various sections of pi no longer occurs inside the iteration (action) in Listing 18.25. As sections of the pi calculation will very likely not complete sequentially, appending a section whenever an iteration completes will likely append them out of order. Even if sequence was not a problem, there is still a potential race condition because the += operator is not atomic. To address both of these problems, each section of pi is stored into an array and no two or more iterations will access a single element within the array simultaneously. Only once all sections of pi are calculated does string.Join() combine them. In other words, we postpone concatenating the sections until after the Parallel.For() loop has completed. This avoids any race condition caused by sections not yet calculated or sections concatenating out of order.

The TPL uses the same sorts of thread pooling techniques that it uses for task scheduling to ensure good performance of the parallel loop: It will try to ensure that CPUs are not overscheduled, and so on.


Guidelines

DO use parallel loops when the computations performed can be easily split up into many mutually independent processor-bound computations that can be executed in any order on any thread.


The TPL also provides a similar parallel version of the foreach statement, as shown in Listing 18.26.

LISTING 18.26: Parallel Execution of a foreach Loop


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;                                        

class Program
{
  // ...
  static void EncryptFiles(
      string directoryPath, string searchPattern)
  {
      IEnumerable<string> files = Directory.EnumerateFiles(
          directoryPath, searchPattern,
          SearchOption.AllDirectories);

      Parallel.ForEach(files, (fileName) =>                          
      {                                                              
          Encrypt(fileName);                                         
      });                                                            
  }
  // ...
}


In this example, we call a method that encrypts each file within the files collection. It does so in parallel, executing as many threads as the TPL determines is efficient.

Canceling a Parallel Loop

Unlike a task, which requires an explicit call if it is to block until it completes, a parallel loop executes iterations in parallel but does not itself return until the entire parallel loop completes. Canceling a parallel loop, therefore, generally involves invocation of the cancellation request from a thread other than the one executing the parallel loop. In Listing 18.28, we invoke Parallel.ForEach<T>() using Task.Run(). In this manner, not only does the query execute in parallel, but it also executes asynchronously, allowing the code to prompt the user to “Push ENTER to exit.”

LISTING 18.28: Canceling a Parallel Loop


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
      // ...

  static void EncryptFiles(
      string directoryPath, string searchPattern)
  {

      string stars =
          "*".PadRight(Console.WindowWidth-1, '*');

      IEnumerable<string> files = Directory.GetFiles(
         directoryPath, searchPattern,
         SearchOption.AllDirectories);

      CancellationTokenSource cts =                                        
          new CancellationTokenSource();                                   
      ParallelOptions parallelOptions =                                    
          new ParallelOptions                                              
              { CancellationToken = cts.Token };                           
      cts.Token.Register(                                                  
          () => Console.WriteLine("Cancelling..."));                       

      Console.WriteLine("Push ENTER to exit.");

      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task task = Task.Run(() =>
          {
              try
              {
                  Parallel.ForEach(
                      files, parallelOptions,
                      (fileName, loopState) =>
                          {
                              Encrypt(fileName);
                          });
              }
              catch(OperationCanceledException){}
          });

      // Wait for the user's input
      Console.Read();

      // Cancel the query
      cts.Cancel();                                                         
      Console.Write(stars);
      task.Wait();
  }
}


The parallel loops use the same cancellation token pattern that tasks use. The token obtained from a CancellationTokenSource is associated with the parallel loop by calling an overload of the ForEach() method that has a parameter of type ParallelOptions. This object contains the cancellation token.

Note that if you cancel a parallel loop operation, any iterations that have not started yet are prevented from starting by checking the IsCancellationRequested property. Existing executing iterations will run to their respective termination points. Furthermore, calling Cancel() even after all iterations have completed will still cause the registered cancel event (via cts.Token.Register()) to execute.

The only means by which the ForEach() method is able to acknowledge that the loop has been canceled is via the OperationCanceledException. Given that cancellation in this example is expected, the exception is caught and ignored, allowing the application to display “Canceling...”, followed by a line of stars before exiting.

Running LINQ Queries in Parallel

Just as it is possible to execute a loop in parallel using Parallel.For(), so it is also possible to execute LINQ queries in parallel using the Parallel LINQ API (PLINQ, for short). An example of a simple nonparallel LINQ expression is shown in Listing 18.29; in Listing 18.30, we modify it to run in parallel.

LISTING 18.29: LINQ Select()


using System.Collections.Generic;
using System.Linq;

class Cryptographer
{
  // ...
  public List<string>
    Encrypt(IEnumerable<string> data)
  {
      return data.Select(
          item => Encrypt(item)).ToList();
  }
  // ...
}


In Listing 18.29, a LINQ query uses the Select() standard query operator to encrypt each string within a sequence of strings, and convert the resultant sequence to a list. This seems like an “embarrassingly parallel” operation; each encryption is likely to be a high-latency processor-bound operation that could be farmed out to a worker thread on another CPU.

Listing 18.30 shows how to modify Listing 18.29 so that the code that encrypts the strings is executed in parallel.

LISTING 18.30: Parallel LINQ Select()


using System.Linq;

class Cryptographer
{
  // ...
  public List<string> Encrypt (IEnumerable<string> data)
  {
      return data.AsParallel().Select(
          item => Encrypt(item)).ToList();
  }
  // ...
}


As Listing 18.30 shows, the change to enable parallel support is extremely small! All that it uses is a .NET Framework 4.0–introduced standard query operator, AsParallel(), which can be found on the static class System.Linq.ParallelEnumerable. This simple extension method tells the runtime that it can execute the query in parallel. The result is that on machines with multiple available CPUs, the total time taken to execute the query can be significantly shorter.

System.Linq.ParallelEnumerable includes a superset of the query operators available on System.Linq.Enumerable, resulting in possible performance improvements for all of the common query operators, including those used for sorting, filtering (Where()), projecting (Select()), joining, grouping, and aggregating. Listing 18.31 shows how to do a parallel sort.

LISTING 18.31: Parallel LINQ with Standard Query Operators


// ...
      OrderedParallelQuery<string> parallelGroups =
          data.AsParallel().OrderBy(item => item);

      // Show the total count of items still
      // matches the original count
      System.Diagnostics.Trace.Assert(
          data.Count == parallelGroups.Sum(
              item => item.Count()));
// ...


As Listing 18.31 shows, invoking the parallel version simply involves a call to the AsParallel() extension method. Notice that the type of the result returned by the parallel standard query operators is either ParallelQuery<T> or OrderedParallelQuery<T>; both inform the compiler that it should continue to use the parallel versions of the standard query operations that are available.

Given that query expressions are simply a syntactic sugar for the method call form of the query used in Listings 18.30 and 18.31, you can just as easily use AsParallel() with the expression form. Listing 18.32 shows an example of executing a grouping operation in parallel using query expression syntax.

LISTING 18.32: Parallel LINQ with Query Expressions


// ...
      ParallelQuery<IGrouping<char, string>> parallelGroups;
      parallelGroups =
          from text in data.AsParallel()                                 
            orderby text                                                 
            group text by text[0];                                       

      // Show the total count of items still
      // matches the original count
      System.Diagnostics.Trace.Assert(
          data.Count == parallelGroups.Sum(
              item => item.Count()));
// ...


As you saw in the previous examples, converting a query or iteration loop to execute in parallel is simple. There is one significant caveat, however: As we will discuss in depth in Chapter 19, you must take care not to allow multiple threads to inappropriately access and modify the same memory simultaneously. Doing so will cause a race condition.

As we saw earlier in this chapter, the Parallel.For() and Parallel.ForEach<T> 4() methods will gather up any exceptions thrown during the parallel iterations and then throw one aggregating exception containing all of the original exceptions. PLINQ operations are no different. That is, they also have the potential of returning multiple exceptions for the exact same reason: When the query logic is run on each element in parallel, the code executing on each element can independently throw an exception. Unsurprisingly, PLINQ deals with this situation in exactly the same way as do parallel loops and the TPL: Exceptions thrown during parallel queries are accessible via the InnerExceptions property of the AggregateException. Therefore, wrapping a PLINQ query in a try/catch block with the exception type of System.AggregateException will successfully handle any exceptions within each iteration that were unhandled.

Canceling a PLINQ Query

As expected, the cancellation request pattern is also available on PLINQ queries. Listing 18.33 (with Output 18.10) provides an example. Like the parallel loops, canceled PLINQ queries will throw a System.OperationCanceledException. Also like the parallel loops, executing a PLINQ query is a synchronous operation on the invoking thread. Thus, a common technique is to wrap the parallel query in a task that runs on another thread so that the current thread can cancel it if necessary—the same solution used in Listing 18.28.

LISTING 18.33: Canceling a PLINQ Query


using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class Program
{

  public static List<string> ParallelEncrypt(
      List<string> data,
      CancellationToken cancellationToken)
  {
      return data.AsParallel().WithCancellation(
          cancellationToken).Select(
              (item) => Encrypt(item)).ToList();
  }

  public static void Main()
  {
      ConsoleColor originalColor = Console.ForegroundColor;
      List<string> data = Utility.GetData(100000).ToList();

      CancellationTokenSource cts =
          new CancellationTokenSource();

      Console.WriteLine("Push ENTER to Exit.");

      // Use Task.Factory.StartNew<string>() for
      // TPL prior to .NET 4.5
      Task task = Task.Run(() =>
      {
          data = ParallelEncrypt(data, cts.Token);
      }, cts.Token);

      // Wait for the user's input
      Console.Read();

      if (!task.IsCompleted)
      {
          cts.Cancel();
          try { task.Wait(); }
          catch (AggregateException exception)
          {
              Console.ForegroundColor = ConsoleColor.Red;
              TaskCanceledException taskCanceledException =
                  (TaskCanceledException)exception.Flatten()
                  .InnerExceptions
                 .FirstOrDefault(
                  innerException =>
                      innerException.GetType() ==
                      typeof(TaskCanceledException));
              if(taskCanceledException != null){
                  Console.WriteLine($@"Cancelled: {
                      taskCanceledException.Message }");
              }
              else
              {
                  // ...
              }
          }
      }
      else
      {
          task.Wait();
          Console.ForegroundColor = ConsoleColor.Green;
          Console.Write("Completed successfully");
      }
      Console.ForegroundColor = originalColor;
  }
}


OUTPUT 18.10

Cancelled: A task was canceled.

As with a parallel loop or task, canceling a PLINQ query requires a CancellationToken, which is available from a CancellationTokenSource. However, rather than overloading every PLINQ query to support the cancellation token, the ParallelQuery<T> object returned by IEnumerable’s AsParallel() method includes a WithCancellation() extension method that simply takes a CancellationToken. As a result, calling Cancel() on the CancellationTokenSource object will request the parallel query to cancel—because it checks the IsCancellationRequested property on the CancellationToken.

As mentioned, canceling a PLINQ query will throw an exception in place of returning the complete result. One common technique for dealing with a possibly canceled PLINQ query is to wrap the query in a try block and catch the OperationCanceledException. A second common technique, used in Listing 18.33, is to pass the CancellationToken both to ParallelEncrypt() and as a second parameter on Run(). This will cause task.Wait() to throw an AggregateException whose InnerException property will be set to a TaskCanceledException. The aggregating exception can then be caught, just as you would catch any other exception from a parallel operation.

Summary

In this chapter, we started by examining the basic parts of multithreaded programs: the Thread class, which represents an independent “point of control” in a program, and the ThreadPool, which encourages efficient allocation and scheduling of threads to multiple CPUs. However, these APIs are low-level entities that are difficult to work with directly. Starting with Version 4.0, the .NET Framework provides the Parallel Extensions library, which includes the Task Parallel Library (TPL) and Parallel LINQ (PLINQ). Both provide new APIs for creating and scheduling units of work represented by Task objects, executing loops in parallel using Parallel.For() and Parallel.ForEach(), and automatically parallelizing LINQ queries with AsParallel().

We also discussed how C# 5.0 makes programming complex workflows with Task objects much easier by automatically rewriting your programs to manage the continuation “wiring” that composes larger tasks out of smaller tasks.

At the beginning of this chapter, we briefly glossed over some of the difficult problems that developers often face when writing multithreaded programs: atomicity problems, deadlocks, and other “race conditions” that introduce uncertainty and bad behavior into multithreaded programs. The standard way to avoid these problems is to carefully write code that uses “locks” to synchronize access to shared resources; this is the topic of the next chapter.

End 4.0

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset