490 31.ProducerConsumerQueues
return (m_top.stackIndex == m_maxSize - 1);
}
inline bool IsEmpty() const
{
return (m_top.stackIndex == 0);
}
void Free(UINT32 arrayIndex);
UINT32 Alloc();
private:
volatile AllocRef *m_pFreeList;
volatile AllocRef m_top;
const UINT m_maxSize;
};
UINT32 Allocator::Alloc()
{
for (;;)
{
AllocRef top = m_top;
AllocRef stackTop = m_pFreeList[top.stackIndex];
CAS2(&m_pFreeList[top.stackIndex].val,
AllocRef(stackTop.arrayIndex, stackTop.stackIndex,
top.version - 1).val,
AllocRef(top.arrayIndex, stackTop.stackIndex,
top.version).val);
if (top.stackIndex == 0) continue; // Stack Empty?
AllocRef belowTop = m_pFreeList[top.stackIndex - 1];
if (CAS2(&m_top.val, AllocRef(top.arrayIndex, top.stackIndex,
top.version).val,
AllocRef(belowTop.arrayIndex, top.stackIndex - 1,
belowTop.version + 1).val))
{
31.8LockFreeImplementationofaQueue 491
return (top.arrayIndex);
}
}
}
void Allocator::Free(UINT32 arrayIndex)
{
for (;;)
{
AllocRef top = m_top;
AllocRef stackTop = m_pFreeList[top.stackIndex];
CAS2(&m_pFreeList[top.stackIndex].val,
AllocRef(stackTop.arrayIndex, stackTop.stackIndex,
top.version - 1).val,
AllocRef(top.arrayIndex, stackTop.stackIndex,
top.version).val);
if (top.stackIndex == m_maxSize - 1) continue; // Stack full?
UINT16 aboveTopCounter = m_pFreeList[top.stackIndex + 1].version;
if (CAS2(&m_top.val, AllocRef(top.arrayIndex, top.stackIndex,
top.version).val,
AllocRef(arrayIndex, top.stackIndex + 1,
aboveTopCounter + 1).val))
{
return;
}
}
}
Listing 31.7. A free list memory block allocator.
31.8LockFreeImplementationofaQueue
The implementation shown in Listing 31.8 is based on the lock-free, array-based
queue algorithm by Shann and Haung [2000] with bug fixes by Colvin and
Groves [2005]. In the original algorithm, the complete item in the queue is atom-
492 31.ProducerConsumerQueues
ically swapped. Unfortunately, due to the absence of a hardware CASn instruc-
tion, this limits an item size to 32 bits (further limited to 20 bits since we share
the same index value in the stack algorithm). Therefore, the algorithm is extend-
ed to use a circular queue of index elements that are offsets to the specific item in
an array. When a producer wants to insert an item, the index is atomically re-
trieved from the free list, and it is not inserted into the queue until the data is fin-
ished being copied into the array. Finally, the index can’t be reused until a
consumer retrieves the index from the queue, copies the data locally, and returns
the index back to the free list.
/* Lock-free Array-based Producer-Consumer (FIFO) Queue
Producer: Inserting an item to the tail of the queue.
- The allocator atomically retrieves a free index from the free pool.
- This index points to an unused entry in the item array.
- The item is copied into the array at the index.
- The index is inserted atomically at the queue's tail and is now visible.
- The index will remain unavailable until consumer atomically removes it.
- When that happens, the index will be placed back on the free pool.
Consumer: Removing an item from the head of the queue.
- The index is atomically removed from the head of the queue.
- If successfully removed, the head is atomically incremented.
- The item is copied from the array to a local copy.
- The index is placed back on the free pool and is now available for reuse.
*/
template <typename T> class Queue
{
public:
Queue(UINT maxItemsInQueue) : m_maxQueueSize(maxItemsInQueue),
m_allocator(maxItemsInQueue),
m_head(0), m_tail(0)
{
// Each value references a unique array element.
m_pQueue = new AllocRef[maxItemsInQueue];
assert(m_pQueue != NULL);
31.8LockFreeImplementationofaQueue 493
// By default, the queue is empty.
for (UINT i = 0; i < m_maxQueueSize; ++i)
{
m_pQueue[i].arrayIndex = AllocRef::NullIndex;
m_pQueue[i].version = 0;
}
// Array of Items
m_pItems = new T[maxItemsInQueue];
assert(m_pItems != NULL);
}
~Queue()
{
delete[] m_pItems;
delete[] m_pQueue;
}
void Insert(const T& item);
T Remove();
private:
Allocator m_allocator; // Free list allocator.
T *m_pItems; // Array of items.
volatile AllocRef *m_pQueue; // FIFO queue.
volatile UINT m_head; // Head of queue.
volatile UINT m_tail; // Tail of queue.
const UINT m_maxQueueSize; // Max items in queue.
};
template <typename T> void Queue<T>::Insert(const T& item)
{
UINT32 index;
do
{
// Obtain free index from free list.
index = m_allocator.Alloc();
} while (index == AllocRef::NullIndex); // Spin until free index.
494 31.ProducerConsumerQueues
m_pItems[index] = item;
for (;;) // Spin until successfully inserted.
{
UINT tail = m_tail;
AllocRef alloc = m_pQueue[tail % m_maxQueueSize];
UINT head = m_head;
if (tail != m_tail) continue;
if (tail == m_head + m_maxQueueSize)
{
if (m_pQueue[head % m_maxQueueSize].arrayIndex !=
AllocRef::NullIndex)
if (head == m_head) continue; // Queue is full.
CAS(&m_head, head, head + 1);
continue;
}
if (alloc.arrayIndex == AllocRef::NullIndex)
{
if (CAS2(&m_pQueue[tail % m_maxQueueSize].val, alloc.val,
AllocRef(index, alloc.version + 1).val))
{
CAS(&m_tail, tail, tail+1);
return;
}
}
else if (m_pQueue[tail % m_maxQueueSize].arrayIndex !=
AllocRef::NullIndex)
{
CAS(&m_tail, tail, tail+1);
}
}
}
template <typename T> T Queue<T>::Remove()
{
for (;;) // Spin until successfully removed.
{
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset