Chapter 23. Asynchronous Methods

In Chapter 21, we saw how a thread provides a parallel execution path. We took for granted that whenever you needed to run something in parallel, you could assign a new or pooled thread to the job. Although this usually holds true, there are exceptions. Suppose you were writing a TCP sockets or web server application that needed to process 1,000 concurrent client requests. If you dedicated a thread to each incoming request, you would consume (by default) a gigabyte of memory purely on thread overhead.

Asynchronous methods address this problem through a pattern by which many concurrent activities are handled by a few pooled threads. This makes it possible to write highly concurrent applications—as well as highly thread-efficient applications.

To avoid getting lost, you’ll need to be familiar with threading (Chapter 21) and streams (Chapter 14).

Why Asynchronous Methods Exist

The problem just described might be insoluble if every thread needed to be busy all of the time. But this is not the case: fetching a web page, for instance, might take up to several seconds from start to end (because of a potentially slow connection) and yet consume only a fraction of a millisecond of CPU time in total. Processing an HTTP request and response is not computationally intensive.

This means that a server thread dedicated to processing a single client request might spend 99% of its time blocked—representing huge economy potential. The asynchronous method pattern (also called the asynchronous programming model or APM) exploits just this potential, allowing a handful of fully utilized threads to take on thousands of concurrent jobs.

Note

If you don’t need such concurrency, avoid the APM; it will unnecessarily complicate your program. Further, asynchronous methods are not guaranteed to execute in parallel with the caller. If you need parallel execution, consider using a TPL Task or BackgroundWorker—or simply starting a new thread.

An asynchronous method aims never to block any thread, instead using a pattern of returning with a callback. (Blocking means entering a WaitSleepJoin state—or causing another thread to do the same—“wasting” a precious thread resource.) To achieve this, an asynchronous method must abstain from calling any blocking method.

The end goal of the APM is thread economy. This means that blocking briefly is OK—such as when locking around reading/writing fields.

A method that takes a while to execute because it performs computationally intensive work does not violate the APM. The purpose of asynchronous methods isn’t to provide a convenient mechanism for executing a method in parallel with the caller; it’s to optimize thread resources. Here’s the golden rule of the APM:

Make good use of the CPU, or exit with a callback!

This means an asynchronous method such as BeginRead may not return immediately to the caller. It can make the caller wait as long as it likes—while making good use of the processor or another constrained resource. It can even finish the entire task synchronously—providing it never blocked and never caused another thread to do the same.

Note

There is an exception to the don’t-block rule. It’s generally OK to block while calling a database server—if other threads are competing for the same server. This is because in a highly concurrent system, the database must be designed such that the majority of queries execute extremely quickly. If you end up with thousands of concurrent queries, it means that requests are hitting the database server faster than it can process them. Thread economy is then the least of your worries!

Similarly, reading or writing to a local hard drive synchronously is usually fine: concurrent I/O will choke the local filesystem way before exhausting the thread pool.

The primary use for asynchronous methods is handling many potentially long-running concurrent requests—typically over slow network connections.

Asynchronous Method Signatures

Asynchronous methods, by convention, all start with Begin, have a pairing method starting with End, and have signatures like those of asynchronous delegates:

IAsyncResult BeginXXX (in/ref-args, AsyncCallback callback, object state);

return-type EndXXX (out/ref-args, IAsyncResult asyncResult);

Here’s an example from the Stream class:

public IAsyncResult BeginRead (byte[] buffer, int offset, int size,
                               AsyncCallback callback, object state);
public int EndRead (IAsyncResult asyncResult);

The Begin method returns an IAsyncResult object, which helps you to manage the asynchronous operation (as we’ll see shortly). That same object is then passed to the completion callback. Here’s its delegate:

public delegate void AsyncCallback (IAsyncResult ar);

As with asynchronous delegates, the EndXXX method allows the return value to be retrieved, as well as any out/ref arguments. This is also where exceptions are rethrown.

Note

If you fail to call the EndXXX method, exceptions won’t get rethrown (meaning silent failure) and resources may circumvent cleanup.

To avoid blocking, you will nearly always call the EndXXX method from inside the callback method. Callbacks always run on pooled threads.

IAsyncResult

The IAsyncResult interface is defined as follows:

public interface IAsyncResult
{
  object AsyncState { get; }            // "state" object passed to Begin.
  WaitHandle AsyncWaitHandle { get; }   // Signaled when complete.
  bool CompletedSynchronously { get; }  // Did it complete on BeginX?
  bool IsCompleted { get; }             // Has it completed yet?
}

AsyncState lets you access the state argument passed to the BeginXXX method.

The wait handle is signaled when the operation is complete. To wait on this without blocking, call ThreadPool.RegisterWaitForSingleObject:

ThreadPool.RegisterWaitForSingleObject (
  result.AsyncWaitHandle,
  (data, timedOut) => { /* Callback */ },
  null, −1, true);

The CompletedSynchronously property indicates that the operation finished immediately after calling the BeginXXX method. This may be true for one of three reasons:

  • The operation could be completed very quickly—and so was executed synchronously to avoid the overhead of managing an asynchronous operation.

  • The underlying implementation—or operating system—doesn’t support APM in that scenario.

  • The operation was CPU-bound and could be completed without blocking.

When this property returns true, the callback will still be fired—but possibly on the same thread as that which called BeginXXX. Failing to consider this possibility can sometimes result in unintended recursion, leading to a stack overflow.

Asynchronous Methods Versus Asynchronous Delegates

Asynchronous delegate invocations implement the same pattern with their method signatures—but without the goal of thread economy:

Asynchronous methods

Asynchronous delegate invocations

Rarely or never blocks any thread

May block threads for any length of time (although not the calling thread)

Begin method may not return immediately to the caller

BeginInvoke returns immediately to the caller

The purpose of asynchronous methods is to allow many tasks to run on few threads; the purpose of asynchronous delegates is to execute a task in parallel with the caller.

You can use an asynchronous delegate to call an asynchronous method—so that execution is guaranteed to return immediately to the caller, while still following the APM. Or better: you can use Framework 4.0’s new Task class to wrap an asynchronous method call to simplify management (we’ll explain how to do this later in the chapter).

If you use an asynchronous delegate to call a blocking method, however, you’re back to square one: the server will either suffer limited concurrency or need thousands of threads to do its job.

Using Asynchronous Methods

Let’s write a simple TCP sockets server that behaves as follows:

  1. It waits for a client request.

  2. It reads a 5,000-byte fixed-length message.

  3. It reverses the bytes in the message, and then returns them to the client.

Let’s first write this using a standard multithreaded blocking pattern. Here is the code, exception handling aside:

Note

You can download a Visual Studio project containing all the code in this chapter—along with a client test harness—at http://www.albahari.com/nutshell/async40.zip.

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;

public class Server
{
 public void Serve (IPAddress address, int port)
 {
    ThreadPool.SetMinThreads (50, 50);    // Refer to Chapter 21
    ThreadPool.SetMaxThreads (50, 50);    // Refer to Chapter 21
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }

  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream())
    {
      byte[] data = new byte [5000];

      int bytesRead = 0; int chunkSize = 1;
      while (bytesRead < data.Length && chunkSize > 0)
        bytesRead +=
          chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS

      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}

Note

From Framework 4.0, you can replace ThreadPool.QueueUserWorkItem with Task.Factory.StartNew. The end result is the same.

Our use of the thread pool prevents an arbitrarily large number of threads from being created (possibly taking down the server) and eliminates the time wasted in creating a new thread per request. Our program is simple and fast, but it is limited to 50 concurrent requests.

In order to scale to 1,000 concurrent requests—without increasing the thread count—we must employ the asynchronous method pattern. This means avoiding the blocking I/O methods altogether and instead calling their asynchronous counterparts. Here’s how to do it:

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (ReverseEcho, c);
    }
  }

  void ReverseEcho (object client)
  {
    new ReverseEcho().Begin ((TcpClient)client);
  }
}

class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;

  internal void Begin (TcpClient c)
  {
    try
    {
      _client = c;
      _stream = c.GetStream();
      Read();
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  void Read()            // Read in a nonblocking fashion.
  {
    while (true)
    {
      IAsyncResult r = _stream.BeginRead
       (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);

      // This will nearly always return in the next line:
      if (!r.CompletedSynchronously) return;   // Handled by callback
      if (!EndRead (r)) break;
    }
    Write();
  }

  void ReadCallback (IAsyncResult r)
  {
    try
    {
      if (r.CompletedSynchronously) return;
      if (EndRead (r))
      {
        Read();       // More data to read!
        return;
      }
      Write();
    }
    catch (Exception ex) { ProcessException (ex); }
  }

  bool EndRead (IAsyncResult r)   // Returns false if there's no more data
  {
    int chunkSize = _stream.EndRead (r);
    _bytesRead += chunkSize;
    return chunkSize > 0 && _bytesRead < _data.Length;   // More to read
  }

  void Write()
  {
    Array.Reverse (_data);
    _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
  }

  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); }
    Cleanup();
  }

  void ProcessException (Exception ex)
  {
    Cleanup();
    Console.WriteLine ("Error: " + ex.Message);
  }

  void Cleanup()
  {
    if (_stream != null) _stream.Close();
    if (_client != null) _client.Close();
  }
}

This program will handle 1,000 concurrent requests on fewer than 10 pooled threads.

Each client request is processed without calling any blocking methods (aside from the call to listener.AcceptTcpClient in the Serve method, which blocks just one thread).

In the Read method, we start by calling BeginRead on the stream, specifying a completion callback. We could simplify the entire method as follows and get the same result:

void Read()
{
  _stream.BeginRead
    (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
}

However, there’s a small chance that BeginRead could finish synchronously and then call ReadCallback on the same thread. Because ReadCallback calls Read again, this might lead to some fairly deep recursion and a stack overflow. To protect against this, we must check CompletedSynchronously after calling BeginRead, and if it returns true, use a loop to call Read until completion rather than relying on the potentially recursive call in ReadCallback.

This leads to why we called AcceptTcpClient in the Serve method—instead of its asynchronous version, BeginAcceptTcpClient. For the benefit of saving one thread, the latter would require the same pattern of use as with BeginRead to avoid a possible stack overflow.

The ReverseEcho class encapsulates a request’s state for its lifetime. We can no longer use local variables for this job, because the execution stack disappears each time we exit (after each call to an asynchronous method). This also complicates cleanup and means that a simple using statement is no longer suitable for closing our TcpClient and stream.

Another complicating factor is that we can’t use types such as BinaryReader and BinaryWriter, because they don’t offer asynchronous versions of their methods. The asynchronous pattern often forces you to work at a lower level than you might otherwise.

Asynchronous Methods and Tasks

We saw in the preceding chapter how Framework 4.0’s Task class can manage a unit of work that runs on a pooled thread. You can also use a Task to wrap asynchronous method calls—via the FromAsync method on TaskFactory.

The task that you get from calling FromAsync is merely a lightweight wrapper over calling a BeginXXX and EndXXX method—it does not get scheduled like an ordinary task. The reason to use FromAsync is to leverage features such as continuations and child tasks. FromAsync is internally implemented using TaskCompletionSource.

The FromAsync method requires the following parameters:

  • A delegate specifying a BeginXXX method

  • A delegate specifying an EndXXX method

  • Additional arguments that will get passed to these methods

FromAsync is overloaded to accept delegate types and arguments that match nearly all the asynchronous method signatures found in the .NET Framework. For instance, assuming stream is a Stream, instead of doing this:

var buffer = new byte[1000];
stream.BeginRead (buffer, 0, 1000, MyCallback, null);
...
void MyCallback (IAsyncResult r)
{
  int bytesRead;
  try { bytesRead = stream.EndWrite (r); }
  catch (Exception ex) { Console.Write (ex.Message); }
  Console.Write (bytesRead + " bytes read");
}

we can do this:

var buffer = new byte[1000];

Task<int> readChunk = Task<int>.Factory.FromAsync (
  stream.BeginRead, stream.EndRead, buffer, 0, 1000, null);

readChunk.ContinueWith (ant => Console.Write (ant.Result + " bytes read"),
                               TaskContinuationOptions.NotOnFaulted);

readChunk.ContinueWith (ant => Console.Write (ant.Exception.Message),
                               TaskContinuationOptions.OnlyOnFaulted);

This doesn’t deliver a huge saving in itself: the real benefit comes when you introduce continuations with child antecedents. Revisiting our earlier example, suppose that we refactor ReverseEcho’s Begin method so that it calls Read on a new task. This liberates Server.Serve from having to create a task itself, but more importantly, it creates a parent for the FromAsync-created tasks upon which we can attach continuations. This avoids having to write separate exception-handling blocks or explicit fault continuations for each child task. Cleanup can also be done easily as another continuation on the parent:

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      new ReverseEcho().BeginAsync (c);
    }
  }
}

class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;

  internal void BeginAsync (TcpClient c)
  {
    _client = c;
    _stream = c.GetStream();

    var task = Task.Factory.StartNew (Read);

    // Set up centralized error handling and cleanup:

    task.ContinueWith (ant =>
      Console.WriteLine ("Error: " + ant.Exception.Message),
      TaskContinuationOptions.OnlyOnFaulted);

    task.ContinueWith (ant =>
    {
      if (_stream != null) _stream.Close();
      if (_client != null) _client.Close();
    });
  }

  void Read()    // This will create a child task.
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null,
      TaskCreationOptions.AttachedToParent);

    readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted 
                                 | TaskCreationOptions.AttachedToParent);
  }


  void Write (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null,
                            TaskCreationOptions.AttachedToParent);
  }
}

We also need not worry about Read going recursive and performing a stack dive: continuations don’t happen synchronously unless you tell them to.

Writing Asynchronous Methods

Returning to our previous example, suppose that the 5,000-byte exchange was just a small part of a more sophisticated communication protocol. It would be nice to turn what we’ve already written into a method like this:

public byte[] ReverseEcho (TcpClient client);

The problem, of course, is that this method signature is synchronous; we need to offer an asynchronous version—in other words, BeginReverseEcho. Further, if an exception is encountered, it’s no good writing it to the Console; we need to throw it back to the consumer at some point. So, to usefully partake in the pattern, we must also offer EndReverseEcho and write a class that implements IAsyncResult.

Our ReverseEcho class is an excellent candidate for IAsyncResult, since it already encapsulates the operation’s state. All we really need to add is some plumbing code to rethrow any exception upon calling EndReverseEcho, and a wait handle to signal at completion.

Here’s a real-world example, complete with exception handling and thread safety:

// This sample can be downloaded at http://www.albahari.com/nutshell/

public class MessagingServices
{
  public static IAsyncResult BeginReverseEcho (TcpClient client,
                                               AsyncCallback callback,
                                               object userState)
  {
    var re = new ReverseEcho();
    re.Begin (client, callback, userState);
    return re;
  }

  public static byte[] EndReverseEcho (IAsyncResult r)
  {
    return ((ReverseEcho)r).End();
  }
}

class ReverseEcho : IAsyncResult
{
  TcpClient _client;
  NetworkStream _stream;
  object _userState;
  ManualResetEvent _waitHandle = new ManualResetEvent (false);
  int _bytesRead = 0;
  byte[] _data = new byte [5000];
  Exception _exception;

  internal ReverseEcho() { }

  // IAsyncResult members:

  public object AsyncState { get { return _userState; } }
  public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
  public bool CompletedSynchronously { get { return false; } }
  public bool IsCompleted
  {
    get { return _waitHandle.WaitOne (0, false); }
  }

  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _userState = state;
    _stream = _client.GetStream();

    Task.Factory.StartNew (Read).ContinueWith (ant =>
    {
      _exception = ant.Exception;   // In case an exception occurred.

      if (_stream != null)
        try { _stream.Close(); }
        catch (Exception ex) { _exception = ex; };

      _waitHandle.Set();

      if (callback != null) callback (this);
    });
  }

  internal byte[] End()     // Wait for completion + rethrow any error.
  {
    AsyncWaitHandle.WaitOne();
    if (_exception != null) throw _exception;
    return _data;
  }

  void Read()
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null);

    readChunk.ContinueWith (ContinueRead,
                            TaskContinuationOptions.NotOnFaulted
                          | TaskContinuationOptions.AttachedToParent);
  }

  void ContinueRead (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null);
  }
}

In Cleanup, we closed _stream but not _client, because the caller may want to continue using _client after performing the reverse echo.

Warning

When writing asynchronous methods, you must meticulously catch all exceptions, saving the exception object so that it can be rethrown when the consumer calls the EndXXX method. The easiest way to achieve this is to attach a continuation to an overarching parent task (as in our example)—this catches all exceptions thrown by its child tasks.

Fake Asynchronous Methods

In general, any Framework method that starts with Begin and that returns an IASyncResult follows the APM. There are, however, some exceptions, based on the Stream class:

BufferedStream
CryptoStream
DeflateStream
MemoryStream

These types rely on fallback asynchronous implementations in the base Stream class, which offer no guarantees not to block. Instead, they use an asynchronous delegate to call a blocking method such as Read or Write. Although this approach is perfectly valid in the case of MemoryStream (it never blocks in the first place, so it is excused), it creates a problem with BufferedStream and CryptoStream—if wrapping anything other than a MemoryStream. In other words, if you call BeginRead or BeginWrite on a CryptoStream that wraps a NetworkStream, some thread is going to block at some point, violating the scalability of the asynchronous method pattern. This is a shame, because the CryptoStream’s decorator architecture is otherwise efficient.

A workaround with CryptoStream is to first read the underlying stream asynchronously into a MemoryStream, and then have the CryptoStream wrap the MemoryStream. This means reading the whole stream into memory, though, which, on a highly concurrent server, is not great for scalability, either. If you really need an asynchronous encryption, a solution is to work at a lower level than CryptoStream—that is, ICryptoTransform. You can see exactly how CryptoStream uses ICryptoTransform to do its work with a disassembly tool such as Red Gate’s Reflector.

DeflateStream does actually follow the asynchronous pattern—or at least tries to. The problem is that it doesn’t properly handle exceptions. If the underlying stream is corrupt, for instance, BeginRead throws an exception on a pooled thread rather than marshaling it back to EndRead. This is an uncatchable exception that takes down your whole application.

The FileStream class is another violator—it touts fake asynchronous methods (i.e., it relies on Stream’s default implementation). However, it does make an attempt at true asynchronous behavior if constructed as follows:

Stream s = new FileStream ("large.bin", FileMode.Create, FileAccess.Write,
                            FileShare.None, 0×1000, true);

The boolean argument at the end instructs FileStream not to use asynchronous delegates—and instead (to attempt) a true APM approach. The problem is that asynchronous file I/O requires operating system support, which may not be forthcoming. If the OS fails to oblige, BeginRead blocks the calling thread in a WaitSleepJoin state.

Lack of asynchronous file I/O is rarely a problem, though, assuming you’re accessing a local filesystem (in fact, it can be a good idea not to use asynchronous file I/O at all). Small file requests are likely to be served from an operating system or hard-drive cache and so be brief and CPU-bound; large file I/O requests are going to seriously limit concurrency if not broken up or throttled in some way, no matter what the threading model. A more insidious blocking issue arises if you’re using a FileStream on a UNC network path: the solution is instead to use lower-level networking classes (such as those described) for communicating between computers.

Alternatives to Asynchronous Methods

Chapter 21 described three analogous techniques—all of which coalesce many tasks onto a few threads:

  • ThreadPool.RegisterWaitForSingleObject

  • The producer/consumer queue

  • The threading and system timers

ThreadPool.RegisterWaitForSingleObject can be helpful in implementing the asynchronous method pattern. A custom producer/consumer queue can provide a complete alternative—with your own pool of workers—but is of no help if you want to interoperate with the .NET Framework (e.g., to read from a NetworkStream). The threading and system timers are excellent if your work is executed in a periodic fashion, rather than in response to requests.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset