475
31
ProducerConsumerQueues
Matthew Johnson
Advanced Micro Devices, Inc.
31.1Introduction
The producer-consumer queue is a common multithreaded algorithm for handling
a thread-safe queue with first-in-first-out (FIFO) semantics. The queue may be
bounded, which means the size of the queue is fixed, or it may be unbounded,
which means the size can grow dynamically based on available memory. Finally,
the individual items in the queue may be fixed or variable in size. Typically, the
implementation is derived from a circular array or a linked list data structure. For
simplicity, this chapter describes bounded queues with elements of the same size.
Multithreaded queues are a common occurrence in existing operating system
(OS) APIs. One example of a thread-safe queue is the Win32 message model,
which is the main communication model for applications to process OS and user
events. Figure 31.1 shows a diagram of a single-producer and single-consumer
model. In this model, the OS produces events such as
WM_CHAR, WM_MOUSEMOVE,
WM_PAINT, etc., and the Win32 application consumes them.
Figure 31.1. Single producer and single thread.
Producer
Consume
r
476 31.ProducerConsumerQueues
The applications for producer and consumer queues extend beyond input
messaging. Imagine a real-time strategy game where the user produces a list of
tasks such as “build factory,” “move unit here,” “attack with hero,” etc. Each task
is consumed by a separate game logic thread that uses pathfinding knowledge of
the environment to execute each task in parallel. Suppose the particular consumer
thread uses an A* algorithm to move a unit and hit a particularly worst-case per-
formance path. The producer thread can still queue new tasks for the consumer
without having to stall and wait for the algorithm to finish.
The producer-consumer queue naturally extends to data parallelism. Imagine
an animation engine that uses the CPU to update N animated bone-skin skeletons
using data from static geometry to handle collision detection and response. As
pictured in Figure 31.2, the animation engine can divide the work into several
threads from a thread pool by producing multiple “update bone-skin” tasks, and
the threads can consume the tasks in parallel. When the queue is empty, the ani-
mation thread can continue, perhaps to signal the rendering thread to draw all of
the characters.
In Figure 31.3, only one item at a time is consumed from the queue (since the
queue is read atomically from the tail); however, the actual time the consumer
thread starts or finishes the task may be out of order because the OS thread
scheduler can preempt the consumer thread before useful work begins.
Figure 31.2. Single producer and multiple consumers.
Producer
Consumer 2
Consumer 1
Consumer
0
31.2MultithreadingOverview 477
Figure 31.3. Sample timeline for producer-consumer model from Figure 31.2.
31.2MultithreadingOverview
Since the producer pushes items onto the head of the queue and the consumer
pops items off of the tail, the algorithm must serialize access between these
memory locations atomically. An atomic operation is an operation that appears to
all processors in the system as occurring instantly and is uninterruptible by other
processes or threads. Although C++ does not natively support atomic data types
(until C++0x), many libraries have other mechanisms to achieve the same results.
Serialization is typically supported by the OS and its API in various forms.
At the process level, a critical section can be used to synchronize access among
multiple threads, whereas a mutex can synchronize access among multiple pro-
cesses. When a thread enters a critical section, no other thread can execute the
code until that thread leaves the critical section. Finally, synchronization primi-
tives can be avoided in the design of a lock-free algorithm by exclusively using
atomic instructions to serialize resources. This requires detailed working
knowledge of the compiler, memory model, and processor.
31.3AFirstApproach:UsingWin32Semaphores
andCriticalSections
Since this chapter covers bounded queues, where each item is fixed in size, we
need a thread-safe way to internally manage the item count in the queue. Sema-
phores are an appropriate way to handle this.
Our first implementation of a FIFO, shown in Listing 31.1, uses a standard
circular queue, which is not thread safe. Note that the head precedes the tail but,
if wrapped around zero for an unsigned integer, continues to work because the
modulus is used for each array access. A common optimization is to use a power
of two for the queue size, allowing the modulus operation to be replaced with a
bitwise AND with one less than the queue size. To keep track of the number of
Producer
Consumer 0
Consumer 1
Consumer 2
478 31.ProducerConsumerQueues
template <typename T> class Fifo
{
public:
Fifo(UINT maxItems) : m_maxSize(maxItems), m_head(0), m_tail(0)
{
m_items = new T[m_maxSize];
}
~Fifo()
{
delete m_items;
}
bool IsEmpty()
{
return (m_tail == m_head);
}
bool IsFull()
{
return (m_tail == m_head + m_maxSize);
}
void Insert(const T& item)
{
assert(!IsFull());
m_items[m_tail++ % m_maxSize] = item;
}
T Remove()
{
assert(!IsEmpty());
return (m_items[m_head++ % m_maxSize]);
}
private:
T *m_items;
31.3AFirstApproach:UsingWin32Se maphoresandCriticalSections 479
UINT m_head;
UINT m_tail;
const UINT m_maxSize;
};
Listing 31.1. A simple FIFO queue.
elements currently in the queue, an m_size member variable can be added. It is
incremented for each insert at the tail and decremented for each removal at the
head.
Next, a producer-consumer class can be created for thread-safe access to the
FIFO queue. The code in Listing 31.2 demonstrates a thread-safe queue using a
semaphore to maintain a count of the maximum number of items allowed in the
queue. The semaphore is initialized to zero by default. Calling
ReleaseSema-
phore()
causes the OS to atomically increment the semaphore count by one.
Calling
WaitForSingleObject() waits if the semaphore count is zero; other-
wise, the OS atomically decrements the count by one and returns from the
function.
template <typename T> class ProducerConsumerQueue
{
public:
enum
{
MaxItemsInQueue = 256
};
ProducerConsumerQueue() : m_queue(MaxItemsInQueue)
{
InitializeCriticalSection(&m_cs);
m_sm = CreateSemaphore(NULL, // security attributes
0, // initial semaphore count
MaxItemsInQueue, // maximum semaphore count
NULL); // name (useful for IPC)
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset