In Chapter 21, we saw how a thread provides a parallel execution path. We took for granted that whenever you needed to run something in parallel, you could assign a new or pooled thread to the job. Although this usually holds true, there are exceptions. Suppose you were writing a TCP sockets or web server application that needed to process 1,000 concurrent client requests. If you dedicated a thread to each incoming request, you would consume (by default) a gigabyte of memory purely on thread overhead.
Asynchronous methods address this problem through a pattern by which many concurrent activities are handled by a few pooled threads. This makes it possible to write highly concurrent applications—as well as highly thread-efficient applications.
To avoid getting lost, you’ll need to be familiar with threading (Chapter 21) and streams (Chapter 14).
The problem just described might be insoluble if every thread needed to be busy all of the time. But this is not the case: fetching a web page, for instance, might take up to several seconds from start to end (because of a potentially slow connection) and yet consume only a fraction of a millisecond of CPU time in total. Processing an HTTP request and response is not computationally intensive.
This means that a server thread dedicated to processing a single client request might spend 99% of its time blocked—representing huge economy potential. The asynchronous method pattern (also called the asynchronous programming model or APM) exploits just this potential, allowing a handful of fully utilized threads to take on thousands of concurrent jobs.
If you don’t need such concurrency, avoid the APM; it will
unnecessarily complicate your program. Further, asynchronous methods
are not guaranteed to execute in parallel with the caller. If you need
parallel execution, consider using a TPL Task
or BackgroundWorker
—or simply starting a new
thread.
An asynchronous method aims never to block any thread, instead using a
pattern of returning with a callback. (Blocking means entering a
WaitSleepJoin
state—or causing
another thread to do the same—“wasting” a precious thread resource.) To
achieve this, an asynchronous method must abstain from calling any
blocking method.
The end goal of the APM is thread economy. This means that blocking briefly is OK—such as when locking around reading/writing fields.
A method that takes a while to execute because it performs computationally intensive work does not violate the APM. The purpose of asynchronous methods isn’t to provide a convenient mechanism for executing a method in parallel with the caller; it’s to optimize thread resources. Here’s the golden rule of the APM:
Make good use of the CPU, or exit with a callback!
This means an asynchronous method such as BeginRead
may not return immediately to the
caller. It can make the caller wait as long as it likes—while making
good use of the processor or another constrained resource. It can even
finish the entire task synchronously—providing it never blocked and
never caused another thread to do the same.
There is an exception to the don’t-block rule. It’s generally OK to block while calling a database server—if other threads are competing for the same server. This is because in a highly concurrent system, the database must be designed such that the majority of queries execute extremely quickly. If you end up with thousands of concurrent queries, it means that requests are hitting the database server faster than it can process them. Thread economy is then the least of your worries!
Similarly, reading or writing to a local hard drive synchronously is usually fine: concurrent I/O will choke the local filesystem way before exhausting the thread pool.
The primary use for asynchronous methods is handling many potentially long-running concurrent requests—typically over slow network connections.
Asynchronous methods, by convention, all start with Begin, have a pairing method starting with End, and have signatures like those of asynchronous delegates:
IAsyncResult BeginXXX (in/ref-args, AsyncCallback callback, object state);
return-type EndXXX (out/ref-args
, IAsyncResult asyncResult);
Here’s an example from the Stream
class:
public IAsyncResult BeginRead (byte[] buffer, int offset, int size, AsyncCallback callback, object state); public int EndRead (IAsyncResult asyncResult);
The Begin
method returns an
IAsyncResult
object, which helps you
to manage the asynchronous operation (as we’ll see shortly). That same
object is then passed to the completion callback. Here’s its
delegate:
public delegate void AsyncCallback (IAsyncResult ar);
As with asynchronous delegates, the End
XXX
method
allows the return value to be retrieved, as well as any out
/ref
arguments. This is also where exceptions are rethrown.
If you fail to call the End
XXX
method,
exceptions won’t get rethrown (meaning silent failure) and resources
may circumvent cleanup.
To avoid blocking, you will nearly always call the End
XXX
method from
inside the callback method. Callbacks always run on pooled
threads.
The IAsyncResult
interface is defined as follows:
public interface IAsyncResult { object AsyncState { get; } // "state" object passed to Begin. WaitHandle AsyncWaitHandle { get; } // Signaled when complete. bool CompletedSynchronously { get; } // Did it complete on BeginX? bool IsCompleted { get; } // Has it completed yet? }
AsyncState
lets you access
the state
argument passed to the
Begin
XXX
method.
The wait handle is signaled when the operation is complete. To
wait on this without blocking, call ThreadPool.RegisterWaitForSingleObject
:
ThreadPool.RegisterWaitForSingleObject ( result.AsyncWaitHandle, (data, timedOut) => { /* Callback */ }, null, −1, true);
The CompletedSynchronously
property indicates that the operation finished immediately after
calling the Begin
XXX
method.
This may be true for one of three reasons:
The operation could be completed very quickly—and so was executed synchronously to avoid the overhead of managing an asynchronous operation.
The underlying implementation—or operating system—doesn’t support APM in that scenario.
The operation was CPU-bound and could be completed without blocking.
When this property returns true
, the callback will still be fired—but
possibly on the same thread as that which called
Begin
XXX
. Failing
to consider this possibility can sometimes result in unintended
recursion, leading to a stack overflow.
Asynchronous delegate invocations implement the same pattern with their method signatures—but without the goal of thread economy:
Asynchronous methods | Asynchronous delegate invocations |
---|---|
Rarely or never blocks any thread | May block threads for any length of time (although not the calling thread) |
|
|
The purpose of asynchronous methods is to allow many tasks to run on few threads; the purpose of asynchronous delegates is to execute a task in parallel with the caller.
You can use an asynchronous delegate to call an asynchronous
method—so that execution is guaranteed to return immediately to the
caller, while still following the APM. Or better: you can use Framework
4.0’s new Task
class to wrap an
asynchronous method call to simplify management (we’ll explain how to do
this later in the chapter).
If you use an asynchronous delegate to call a blocking method, however, you’re back to square one: the server will either suffer limited concurrency or need thousands of threads to do its job.
Let’s write a simple TCP sockets server that behaves as follows:
It waits for a client request.
It reads a 5,000-byte fixed-length message.
It reverses the bytes in the message, and then returns them to the client.
Let’s first write this using a standard multithreaded blocking pattern. Here is the code, exception handling aside:
You can download a Visual Studio project containing all the code in this chapter—along with a client test harness—at http://www.albahari.com/nutshell/async40.zip.
using System; using System.Threading; using System.Net; using System.Net.Sockets; public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); // Refer to Chapter 21 ThreadPool.SetMaxThreads (50, 50); // Refer to Chapter 21 TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem (Accept, c); } } void Accept (object clientObject) { using (TcpClient client = (TcpClient) clientObject) using (NetworkStream n = client.GetStream()) { byte[] data = new byte [5000]; int bytesRead = 0; int chunkSize = 1; while (bytesRead < data.Length && chunkSize > 0) bytesRead += chunkSize = n.Read (data, bytesRead, data.Length - bytesRead); // BLOCKS Array.Reverse (data); n.Write (data, 0, data.Length); // BLOCKS } } }
From Framework 4.0, you can replace ThreadPool.QueueUserWorkItem
with Task.Factory.StartNew
. The end result is the
same.
Our use of the thread pool prevents an arbitrarily large number of threads from being created (possibly taking down the server) and eliminates the time wasted in creating a new thread per request. Our program is simple and fast, but it is limited to 50 concurrent requests.
In order to scale to 1,000 concurrent requests—without increasing the thread count—we must employ the asynchronous method pattern. This means avoiding the blocking I/O methods altogether and instead calling their asynchronous counterparts. Here’s how to do it:
public class Server
{
public void Serve (IPAddress address, int port)
{
ThreadPool.SetMinThreads (50, 50);
TcpListener listener = new TcpListener (address, port);
listener.Start();
while (true)
{
TcpClient c = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem (ReverseEcho, c);
}
}
void ReverseEcho (object client)
{
new ReverseEcho().Begin ((TcpClient)client);
}
}
class ReverseEcho
{
TcpClient _client;
NetworkStream _stream;
byte[] _data = new byte [5000];
int _bytesRead = 0;
internal void Begin (TcpClient c)
{
try
{
_client = c;
_stream = c.GetStream();
Read();
}
catch (Exception ex) { ProcessException (ex); }
}
void Read() // Read in a nonblocking fashion.
{
while (true)
{
IAsyncResult r = _stream.BeginRead
(_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
// This will nearly always return in the next line:
if (!r.CompletedSynchronously) return; // Handled by callback
if (!EndRead (r)) break;
}
Write();
}
void ReadCallback (IAsyncResult r)
{
try
{
if (r.CompletedSynchronously) return;
if (EndRead (r))
{
Read(); // More data to read!
return;
}
Write();
}
catch (Exception ex) { ProcessException (ex); }
}
bool EndRead (IAsyncResult r) // Returns false if there's no more data
{
int chunkSize = _stream.EndRead (r);
_bytesRead += chunkSize;
return chunkSize > 0 && _bytesRead < _data.Length; // More to read
}
void Write()
{
Array.Reverse (_data);
_stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
}
void WriteCallback (IAsyncResult r)
{
try { _stream.EndWrite (r);
}
catch (Exception ex) { ProcessException (ex); }
Cleanup();
}
void ProcessException (Exception ex)
{
Cleanup();
Console.WriteLine ("Error: " + ex.Message);
}
void Cleanup()
{
if (_stream != null) _stream.Close();
if (_client != null) _client.Close();
}
}
This program will handle 1,000 concurrent requests on fewer than 10 pooled threads.
Each client request is processed without calling any blocking
methods (aside from the call to listener.AcceptTcpClient
in the Serve
method, which blocks just one
thread).
In the Read
method, we start by
calling BeginRead
on the stream,
specifying a completion callback. We could simplify the entire method as
follows and get the same result:
void Read() { _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null); }
However, there’s a small chance that BeginRead
could finish synchronously and then
call ReadCallback
on the same thread.
Because ReadCallback
calls Read
again, this might lead to some fairly
deep recursion and a stack overflow. To protect against this, we must
check CompletedSynchronously
after
calling BeginRead
, and if it returns
true
, use a loop
to call Read
until completion rather
than relying on the potentially recursive call in ReadCallback
.
This leads to why we called AcceptTcpClient
in the Serve
method—instead of its asynchronous
version, BeginAcceptTcpClient
. For
the benefit of saving one thread, the latter would require the same
pattern of use as with BeginRead
to
avoid a possible stack overflow.
The ReverseEcho
class encapsulates a request’s state for its lifetime. We
can no longer use local variables for this job, because the execution
stack disappears each time we exit (after each call to an asynchronous
method). This also complicates cleanup and means that a simple using
statement is no longer suitable for
closing our TcpClient
and
stream.
Another complicating factor is that we can’t use types such as
BinaryReader
and BinaryWriter
, because they don’t offer
asynchronous versions of their methods. The asynchronous pattern often
forces you to work at a lower level than you might otherwise.
We saw in the preceding chapter how Framework 4.0’s Task
class can manage a unit of work that runs
on a pooled thread. You can also use a Task
to wrap asynchronous method calls—via the
FromAsync
method on TaskFactory
.
The task that you get from calling FromAsync
is merely a lightweight wrapper over
calling a Begin
XXX
and
End
XXX
method—it does not get scheduled like an ordinary
task. The reason to use FromAsync
is
to leverage features such as continuations and child tasks. FromAsync
is internally implemented using
TaskCompletionSource
.
The FromAsync
method requires
the following parameters:
A delegate specifying a Begin
XXX
method
A delegate specifying an End
XXX
method
Additional arguments that will get passed to these methods
FromAsync
is overloaded to
accept delegate types and arguments that match nearly all the
asynchronous method signatures found in the .NET Framework. For
instance, assuming stream
is a
Stream
, instead of doing this:
var buffer = new byte[1000]; stream.BeginRead (buffer, 0, 1000, MyCallback, null); ... void MyCallback (IAsyncResult r) { int bytesRead; try { bytesRead = stream.EndWrite (r); } catch (Exception ex) { Console.Write (ex.Message); } Console.Write (bytesRead + " bytes read"); }
we can do this:
var buffer = new byte[1000]; Task<int> readChunk = Task<int>.Factory.FromAsync ( stream.BeginRead, stream.EndRead, buffer, 0, 1000, null); readChunk.ContinueWith (ant => Console.Write (ant.Result + " bytes read"), TaskContinuationOptions.NotOnFaulted); readChunk.ContinueWith (ant => Console.Write (ant.Exception.Message), TaskContinuationOptions.OnlyOnFaulted);
This doesn’t deliver a huge saving in itself: the real benefit
comes when you introduce continuations with child antecedents.
Revisiting our earlier example, suppose that we refactor ReverseEcho
’s Begin
method so that it calls Read
on a new task. This liberates Server.Serve
from having to create a task
itself, but more importantly, it creates a parent for the FromAsync
-created tasks upon which we can
attach continuations. This avoids having to write separate
exception-handling blocks or explicit fault continuations for each child
task. Cleanup can also be done easily as another continuation on the
parent:
public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); new ReverseEcho().BeginAsync (c); } } } class ReverseEcho { TcpClient _client; NetworkStream _stream; byte[] _data = new byte [5000]; int _bytesRead = 0; internal void BeginAsync (TcpClient c) { _client = c; _stream = c.GetStream(); var task = Task.Factory.StartNew (Read); // Set up centralized error handling and cleanup: task.ContinueWith
(ant => Console.WriteLine ("Error: " + ant.Exception.Message), TaskContinuationOptions.OnlyOnFaulted); task.ContinueWith
(ant => { if (_stream != null) _stream.Close(); if (_client != null) _client.Close(); }); } void Read() // This will create a child task. { Task<int> readChunk = Task<int>.Factory.FromAsync ( _stream.BeginRead, _stream.EndRead, _data, _bytesRead, _data.Length - _bytesRead, null, TaskCreationOptions.AttachedToParent); readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent); } void Write (Task<int> readChunk) { _bytesRead += readChunk.Result; if (readChunk.Result > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite, _data, 0, _data.Length, null,TaskCreationOptions.AttachedToParent
); } }
We also need not worry about Read
going recursive and performing a stack
dive: continuations don’t happen synchronously unless you tell them
to.
Returning to our previous example, suppose that the 5,000-byte exchange was just a small part of a more sophisticated communication protocol. It would be nice to turn what we’ve already written into a method like this:
public byte[] ReverseEcho (TcpClient client);
The problem, of course, is that this method signature is
synchronous; we need to offer an asynchronous version—in other words,
BeginReverseEcho
. Further, if an
exception is encountered, it’s no good writing it to the Console; we
need to throw it back to the consumer at some point. So, to usefully
partake in the pattern, we must also offer EndReverseEcho
and
write a class that implements IAsyncResult
.
Our ReverseEcho
class is an
excellent candidate for IAsyncResult
,
since it already encapsulates the operation’s state. All we really need
to add is some plumbing code to rethrow any exception upon calling
EndReverseEcho
, and a wait handle to
signal at completion.
Here’s a real-world example, complete with exception handling and thread safety:
// This sample can be downloaded at http://www.albahari.com/nutshell/ public class MessagingServices { public static IAsyncResultBeginReverseEcho (TcpClient client,
AsyncCallback callback, object userState) { var re = new ReverseEcho(); re.Begin (client, callback, userState); return re; } public static byte[]EndReverseEcho (IAsyncResult r)
{ return ((ReverseEcho)r).End(); } } class ReverseEcho : IAsyncResult { TcpClient _client; NetworkStream _stream; object _userState; ManualResetEvent _waitHandle = new ManualResetEvent (false); int _bytesRead = 0; byte[] _data = new byte [5000]; Exception _exception; internal ReverseEcho() { } // IAsyncResult members: public object AsyncState { get { return _userState; } } public WaitHandle AsyncWaitHandle { get { return _waitHandle; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return _waitHandle.WaitOne (0, false); } } internal void Begin (TcpClient c, AsyncCallback callback, object state) { _client = c; _userState = state; _stream = _client.GetStream(); Task.Factory.StartNew (Read).ContinueWith (ant => { _exception = ant.Exception; // In case an exception occurred. if (_stream != null) try { _stream.Close(); } catch (Exception ex) { _exception = ex; }; _waitHandle.Set(); if (callback != null) callback (this); }); } internal byte[] End() // Wait for completion + rethrow any error. { AsyncWaitHandle.WaitOne(); if (_exception != null) throw _exception; return _data; } void Read() { Task<int> readChunk = Task<int>.Factory.FromAsync ( _stream.BeginRead, _stream.EndRead, _data, _bytesRead, _data.Length - _bytesRead, null); readChunk.ContinueWith (ContinueRead, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); } void ContinueRead (Task<int> readChunk) { _bytesRead += readChunk.Result; if (readChunk.Result > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite, _data, 0, _data.Length, null); } }
In Cleanup
, we closed _stream
but not _client
, because the caller may want to
continue using _client
after
performing the reverse echo.
When writing asynchronous methods, you must meticulously catch
all exceptions, saving the exception object so that it can be rethrown
when the consumer calls the End
XXX
method.
The easiest way to achieve this is to attach a continuation to an
overarching parent task (as in our example)—this catches all
exceptions thrown by its child tasks.
In general, any Framework method that starts with
Begin and that returns an IASyncResult
follows the APM. There are,
however, some exceptions, based on the Stream
class:
BufferedStream CryptoStream DeflateStream MemoryStream
These types rely on fallback asynchronous implementations in the
base Stream
class, which offer no
guarantees not to block. Instead, they use an asynchronous delegate to
call a blocking method such as Read
or Write
. Although this approach is perfectly
valid in the case of MemoryStream
(it
never blocks in the first place, so it is excused), it creates a problem
with BufferedStream
and CryptoStream
—if wrapping anything other than a
MemoryStream
. In other words, if you
call BeginRead
or BeginWrite
on a CryptoStream
that wraps a NetworkStream
, some thread is going to block
at some point, violating the scalability of the asynchronous method
pattern. This is a shame, because the CryptoStream
’s decorator architecture is
otherwise efficient.
A workaround with CryptoStream
is to first read the underlying stream asynchronously into a MemoryStream
, and then have the CryptoStream
wrap the MemoryStream
. This means reading the
whole stream into memory, though, which, on a highly concurrent server,
is not great for scalability, either. If you really need an asynchronous
encryption, a solution is to work at a lower level than CryptoStream
—that is, ICryptoTransform
. You can see exactly how
CryptoStream
uses ICryptoTransform
to do its work with a
disassembly tool such as Red Gate’s Reflector.
DeflateStream
does actually
follow the asynchronous pattern—or at least tries to. The problem is
that it doesn’t properly handle exceptions. If the underlying stream is
corrupt, for instance, BeginRead
throws an exception on a pooled thread rather than marshaling it back to
EndRead
. This is an uncatchable
exception that takes down your whole application.
The FileStream
class is another
violator—it touts fake asynchronous methods (i.e.,
it relies on Stream
’s default
implementation). However, it does make an attempt at true asynchronous
behavior if constructed as follows:
Stream s = new FileStream ("large.bin", FileMode.Create, FileAccess.Write,
FileShare.None, 0×1000, true
);
The boolean argument at the end instructs FileStream
not to use asynchronous delegates—and instead (to attempt) a true
APM approach. The problem is that asynchronous file I/O requires
operating system support, which may not be forthcoming. If the OS fails
to oblige, BeginRead
blocks the
calling thread in a WaitSleepJoin
state.
Lack of asynchronous file I/O is rarely a problem, though,
assuming you’re accessing a local filesystem (in fact, it can be a good
idea not to use asynchronous file I/O at all). Small file requests are
likely to be served from an operating system or hard-drive cache and so
be brief and CPU-bound; large file I/O requests are going to seriously
limit concurrency if not broken up or throttled in some way, no matter
what the threading model. A more insidious blocking issue arises if
you’re using a FileStream
on a UNC network path: the solution is
instead to use lower-level networking classes (such as those described)
for communicating between computers.
Chapter 21 described three analogous techniques—all of which coalesce many tasks onto a few threads:
ThreadPool.RegisterWaitForSingleObject
The producer/consumer queue
The threading and system timers
ThreadPool.RegisterWaitForSingleObject
can be helpful in implementing the asynchronous method
pattern. A custom producer/consumer queue can provide a complete
alternative—with your own pool of workers—but is of no help if you want
to interoperate with the .NET Framework (e.g., to read from a NetworkStream
). The threading and system
timers are excellent if your work is executed in a periodic fashion,
rather than in response to requests.