Chapter 8. Queues, Threads, and Reading Data

In this chapter we introduce the use of queues and threads in TensorFlow, with the main motivation of streamlining the process of reading input data. We show how to write and read TFRecords, the efficient TensorFlow file format. We then demonstrate queues, threads, and related functionalities, and connect all the dots in a full working example of a multithreaded input pipeline for image data that includes pre-processing, batching, and training.

The Input Pipeline

When dealing with small datasets that can be stored in memory, such as MNIST images, it is reasonable to simply load all data into memory, then use feeding to push data into a TensorFlow graph. For larger datasets, however, this can become unwieldy. A natural paradigm for handling such cases is to keep the data on disk and load chunks of it as needed (such as mini-batches for training), such that the only limit is the size of your hard drive.

In addition, in many cases in practice, a typical data pipeline often includes steps such as reading input files with different formats, changing the shape or structure of input, normalizing or doing other forms of pre-processing, and shuffling the input, all before training has even started.

Much of this process can trivially be decoupled and broken into modular components. Pre-processing, for example, does not involve training, and thus naively inputs can be preprocessed all at once and then fed to training. Since our training works on batches of examples in any case, we could in principle handle batches of inputs on the fly, reading them from disk, applying pre-processing, and then feeding them into the computational graph for training.

This approach, however, can be wasteful. Because pre-processing is independent of training, waiting for each batch to be pre-processed would lead to severe I/O latency, forcing each training step to (impatiently) wait for mini-batches of data to be loaded and processed. A more scalable practice would be to prefetch the data and use independent threads for loading and processing and for training. But this practice, in turn, could become messy when working with many files kept on disk that need to be repeatedly read and shuffled, and require a fair amount of bookkeeping and technicalities to run seamlessly.

It’s important to note that even without taking pre-processing into consideration, using the standard feeding mechanism (with a feed_dict) we saw in previous chapters is wasteful in itself. feed_dict does a single-threaded copy of data from the Python runtime to the TensorFlow runtime, causing further latency and slowdowns. We would like to avoid this by somehow reading data directly into native TensorFlow.

To make our lives easier (and faster), TensorFlow comes with a set of tools to streamline this input-pipeline process. The main building blocks are a standard TensorFlow file format, utilities for encoding and decoding this format, queues of data, and multithreading.

We will go over these key components one by one, exploring how they work and building toward an end-to-end multithreaded input pipeline. We begin by introducing TFRecords, the recommended file format for TensorFlow, which will come in useful later on.

TFRecords

Datasets, of course, can come in many formats, sometimes even mixed (such as images and audio files). It can often be convenient—and useful—to convert input files into one unifying format, regardless of their original formats. TensorFlow’s default, standard data format is the TFRecord. A TFRecord file is simply a binary file, containing serialized input data. Serialization is based on protocol buffers (protobufs), which in plain words convert data for storage by using a schema describing the data structure, independently of what platform or language is being used (much like XML).

In our setting, using TFRecords (and protobufs/binary files in general) has many advantages over just working with raw data files. This unified format allows for a tidy way to organize input data, with all relevant attributes for an input instance kept together, avoiding the need for many directories and subdirectories. TFRecord files enable very fast processing. All data is kept in one block of memory, as opposed to storing each input file separately, cutting the time needed to read data from memory. It’s also important to note that TensorFlow comes with many implementations and utilities optimized for TFRecords, making it well suited for use as part of a multithreaded input pipeline.

Writing with TFRecordWriter

We begin by writing our input files to TFRecord format, to allow us to work with them (in other cases, we may already have the data stored in this format). In this example we will convert MNIST images to this format, but the same ideas carry on to other types of data.

First, we download the MNIST data to save_dir, using a utility function from tensorflow.contrib.learn:

from __future__ import print_function
import os
import tensorflow as tf
from tensorflow.contrib.learn.python.learn.datasets import mnist

save_dir = "path/to/mnist"

# Download data to save_dir
data_sets = mnist.read_data_sets(save_dir,
                                 dtype=tf.uint8,
                                 reshape=False,
                                 validation_size=1000)

Our downloaded data includes train, test, and validation images, each in a separate split. We go over each split, putting examples in a suitable format and using TFRecordWriter() to write to disk:

data_splits = ["train","test","validation"]
for d in range(len(data_splits)):
  print("saving " + data_splits[d])
  data_set = data_sets[d]

  filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')
  writer = tf.python_io.TFRecordWriter(filename)
  for index in range(data_set.images.shape[0]):
    image = data_set.images[index].tostring()
    example = tf.train.Example(features=tf.train.Features(feature={
    'height': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value=
                                  [data_set.images.shape[1]])),
    'width': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [data_set.images.shape[2]])),
    'depth': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [data_set.images.shape[3]])),
    'label': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [int(data_set.labels[index])])),
     'image_raw': tf.train.Feature(bytes_list=
                                   tf.train.BytesList(value =
                                                     [image]))}))
    writer.write(example.SerializeToString())
  writer.close()

Let’s break this code down to understand the different components.

We first instantiate a TFRecordWriter object, giving it the path corresponding to the data split:

filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')
writer = tf.python_io.TFRecordWriter(filename)

We then go over each image, converting it from a NumPy array to a byte string:

image = data_set.images[index].tostring()

Next, we convert images to their protobuf format. tf.train.Example is a structure for storing our data.  An Example object contains a Features object, which in turn contains a map from attribute name to a Feature. A Feature can contain an Int64List, a BytesList, or a FloatList (not used here). For example, here we encode the label of the image:

tf.train.Feature(int64_list=tf.train.Int64List(value =
                                  [int(data_set.labels[index])]))

And here is the encoding for the actual raw image:

tf.train.Feature(bytes_list=tf.train.BytesList(value =[image]))

Let’s take a look at what our saved data looks like. We do this with tf.python_io.tf_record_iterator, an iterator that reads records from a TFRecords file:

filename = os.path.join(save_dir, 'train.tfrecords')
record_iterator = tf.python_io.tf_record_iterator(filename)
seralized_img_example= next(record_iterator)

serialized_img is a byte string. To recover the structure we used when saving the image to a TFRecord, we parse this byte string, allowing us to access all the attributes we stored earlier:

example = tf.train.Example()
example.ParseFromString(seralized_img_example)
image = example.features.feature['image_raw'].bytes_list.value
label = example.features.feature['label'].int64_list.value[0]
width = example.features.feature['width'].int64_list.value[0]
height = example.features.feature['height'].int64_list.value[0]

Our image was saved as a byte string too, so we convert it back to a NumPy array and reshape it back to a tensor with shape (28,28,1):

img_flat = np.fromstring(image[0], dtype=np.uint8)
img_reshaped = img_flat.reshape((height, width, -1))

This basic example should have given you a feel for TFRecords and how to write and read them. In practice, we will typically want to read TFRecords into a queue of prefetched data as part of a multithreaded process. In the next section, we first introduce TensorFlow queues before showing how to use them with TFRecords.

Queues

A TensorFlow queue is similar to an ordinary queue, allowing us to enqueue new items, dequeue existing items, etc. The important difference from ordinary queues is that, just like anything else in TensorFlow, the queue is part of a computational graph. Its operations are symbolic as usual, and other nodes in the graph can alter its state (much like with Variables). This can be slightly confusing at first, so let’s walk through some examples to get acquainted with basic queue functionalities.

Enqueuing and Dequeuing

Here we create a first-in, first-out (FIFO) queue of strings, with a maximal number of 10 elements that can be stored in the queue. Since queues are part of a computational graph, they are run within a session. In this example, we use a tf.InteractiveSession():

import tensorflow as tf

sess= tf.InteractiveSession()
queue1 = tf.FIFOQueue(capacity=10,dtypes=[tf.string])

Behind the scenes, TensorFlow creates a memory buffer for storing the 10 items.

Just like any other operation in TensorFlow, to add items to the queue, we create an op:

enque_op = queue1.enqueue(["F"])

Since you are by now familiar with the concept of a computational graph in TensorFlow, it should be no surprise that defining the enque_op does not add anything to the queue—we need to run the op. So, if we look at the size of queue1 before running the op, we get this:

sess.run(queue1.size())

Out:
0

After running the op, our queue now has one item populating it:

enque_op.run()
sess.run(queue1.size())
Out:
1

Let’s add some more items to queue1, and look at its size again:

enque_op = queue1.enqueue(["I"])
enque_op.run()
enque_op = queue1.enqueue(["F"])
enque_op.run()
enque_op = queue1.enqueue(["O"])
enque_op.run()

sess.run(queue1.size())

Out: 
4

Next, we dequeue items. Dequeuing too is an op, whose output evaluates to a tensor corresponding to the dequeued item:

x = queue1.dequeue()
x.eval()

Out: b'F'
x.eval()

Out: b'I'
x.eval()

Out: b'F'
x.eval()

Out: b'O'

Note that if we were to run x.eval() one more time, on an empty queue, our main thread would hang forever. As we will see later in this chapter, in practice we use code that knows when to stop dequeuing and avoid hanging.

Another way to dequeue is by retrieving multiple items at once, with the dequeue_many() operation. This op requires that we specify the shape of items in advance:

queue1 = tf.FIFOQueue(capacity=10,dtypes=[tf.string],shapes=[()])

Here we fill the queue exactly as before, and then dequeue four items at once:

inputs = queue1.dequeue_many(4)
inputs.eval()
Out: 
array([b'F', b'I', b'F', b'O'], dtype=object)

Multithreading

A TensorFlow session is multithreaded—multiple threads can use the same session and run ops in parallel. Individual ops have parallel implementations that are used by default with multiple CPU cores or GPU threads. However, if a single call to sess.run() does not make full use of the available resources, one can increase throughput by making multiple parallel calls. For example, in a typical scenario, we may have multiple threads apply pre-processing to images and push them into a queue, while another thread pulls pre-processed images from the queue for training (in the next chapter, we will discuss distributed training, which is conceptually related, with important differences).

Let’s walk our way through a few simple examples introducing threading in TensorFlow and the natural interplay with queues, before connecting all the dots later on in a full example with MNIST images.

We start by creating a FIFO queue with capacity of 100 items, where each item is a random float generated with tf.random_normal():

from __future__ import print_function
import threading
import time

gen_random_normal = tf.random_normal(shape=())
queue = tf.FIFOQueue(capacity=100,dtypes=[tf.float32],shapes=())
enque = queue.enqueue(gen_random_normal)

def add():
    for i in range(10):
        sess.run(enque)

Note, again, that the enque op does not actually add the random numbers to the queue (and they are not yet generated) prior to graph execution. Items will be enqueued using the function add() we create that adds 10 items to the queue by calling sess.run() multiple times.

Next, we create 10 threads, each running add() in parallel, thus each pushing 10 items to the queue, asynchronously. We could think (for now) of these random numbers as training data being added into a queue:

threads = [threading.Thread(target=add, args=()) for i in range(10)]

threads
Out: 
[<Thread(Thread-77, initial)>,
 <Thread(Thread-78, initial)>,
 <Thread(Thread-79, initial)>,
 <Thread(Thread-80, initial)>,
 <Thread(Thread-81, initial)>,
 <Thread(Thread-82, initial)>,
 <Thread(Thread-83, initial)>,
 <Thread(Thread-84, initial)>,
 <Thread(Thread-85, initial)>,
 <Thread(Thread-86, initial)>]

We have created a list of threads, and now we execute them, printing the size of the queue at short intervals as it grows from 0 to 100:

for t in threads: 
    t.start()

print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))

Out:
10
84
100

Finally, we dequeue 10 items at once with dequeue_many(), and examine the results:

x = queue.dequeue_many(10)
print(x.eval())
sess.run(queue.size())

Out:
[ 0.05863889  0.61680967  1.05087686 -0.29185265 -0.44238046  0.53796548
 -0.24784896  0.40672767 -0.88107938  0.24592835]
90

Coordinator and QueueRunner

In realistic scenarios (as we shall see later in this chapter), it can be more complicated to run multiple threads effectively. Threads should be able to stop properly (to avoid “zombie” threads, for example, or to close all threads together when one fails), queues need to be closed after stopping, and there are other technical but important issues that need to be addressed.

TensorFlow comes equipped with tools to help us in this process. Key among them are tf.train.Coordinator, for coordinating the termination of a set of threads, and tf.train.QueueRunner, which streamlines the process of getting multiple threads to enqueue data with seamless cooperation.

tf.train.Coordinator

We first demonstrate how to use tf.train.Coordinator with a simple, toy example. In the next section, we’ll see how to use it as part of a real input pipeline.

We use the code similar to that in the previous section, altering the add() function and adding a coordinator:

gen_random_normal = tf.random_normal(shape=())
queue = tf.FIFOQueue(capacity=100,dtypes=[tf.float32],shapes=())
enque = queue.enqueue(gen_random_normal)

def add(coord,i):
    while not coord.should_stop():
        sess.run(enque)
        if i == 11:
            coord.request_stop()
            
coord = tf.train.Coordinator()
threads = [threading.Thread(target=add, args=(coord,i)) for i in range(10)]
coord.join(threads)

for t in threads: 
    t.start()

print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size())) 

10
100
100

Any thread can call coord.request_stop() to get all other threads to stop. Threads typically run loops that check whether to stop, using coord.should_stop(). Here, we pass the thread index i to add(), and use a condition that is never satisfied (i==11) to request a stop. Thus, our threads complete their job, adding the full 100 items to the queue. However, if we were to alter add() as follows:

def add(coord,i):
    while not coord.should_stop():
        sess.run(enque)
        if i == 1:
            coord.request_stop()

then thread i=1 would use the coordinator to request all threads to stop, stopping all enqueueing early:

print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
Out:
10
17
17

tf.train.QueueRunner and tf.RandomShuffleQueue

While we can create a number of threads that repeatedly run an enqueue op, it is better practice to use the built-in tf.train.QueueRunner, which does exactly that, while closing the queue upon an exception.

Here we create a queue runner that will run four threads in parallel to enqueue items:

gen_random_normal = tf.random_normal(shape=())
queue = tf.RandomShuffleQueue(capacity=100,dtypes=[tf.float32],
                              min_after_dequeue=1)
enqueue_op = queue.enqueue(gen_random_normal)

qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
coord.request_stop()
coord.join(enqueue_threads)

Note that qr.create_threads() takes our session as an argument, along with our coordinator.

In this example, we used a tf.RandomShuffleQueue rather than the FIFO queue. A RandomShuffleQueue is simply a queue with a dequeue op that pops items in random order. This is useful when training deep neural networks with stochastic gradient-descent optimization, which requires shuffling the data. The min_after_dequeue argument specifies the minimum number of items that will remain in the queue after calling a dequeue op—a bigger number entails better mixing (random sampling), but more memory.

A Full Multithreaded Input Pipeline

We now put all the pieces together in a working example with MNIST images, from writing data to TensorFlow’s efficient file format, through data loading and pre-processing, to training a model. We do so by building on the queuing and multithreading functionality demonstrated earlier, and along the way introduce some more useful components for reading and processing data in TensorFlow.

First, we write the MNIST data to TFRecords, with the same code we used at the start of this chapter:

from __future__ import print_function
import os
import tensorflow as tf
from tensorflow.contrib.learn.python.learn.datasets import mnist
import numpy as np

save_dir = "path/to/mnist"

# Download data to save_dir
data_sets = mnist.read_data_sets(save_dir,
                                 dtype=tf.uint8,
                                 reshape=False,
                                 validation_size=1000)
  
data_splits = ["train","test","validation"]
for d in range(len(data_splits)):
  print("saving " + data_splits[d])
  data_set = data_sets[d]

  filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')
  writer = tf.python_io.TFRecordWriter(filename)
  for index in range(data_set.images.shape[0]):
    image = data_set.images[index].tostring()
    example = tf.train.Example(features=tf.train.Features(feature={
    'height': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value=
                                  [data_set.images.shape[1]])),
    'width': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [data_set.images.shape[2]])),
    'depth': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [data_set.images.shape[3]])),
    'label': tf.train.Feature(int64_list=
                                  tf.train.Int64List(value =
                                  [int(data_set.labels[index])])),
     'image_raw': tf.train.Feature(bytes_list=
                                   tf.train.BytesList(value =
                                                     [image]))}))
    writer.write(example.SerializeToString())
  writer.close()

tf.train.string_input_producer() and tf.TFRecordReader()

tf.train.string_input_producer() simply creates a QueueRunner behind the scenes, outputting filename strings to a queue for our input pipeline. This filename queue will be shared among multiple threads:

filename = os.path.join(save_dir ,"train.tfrecords")
filename_queue = tf.train.string_input_producer(
    [filename], num_epochs=10)

The num_epochs argument tells string_input_producer() to produce each filename string num_epochs times.

Next, we read files from this queue using TFRecordReader(), which takes a queue of filenames and dequeues filename by filename off the filename_queue. Internally, TFRecordReader() uses the state of the graph to keep track of the location of the TFRecord being read, as it loads “chunk after chunk” of input data from the disk:

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64),
    })

tf.train.shuffle_batch()

We decode the raw byte string data, do (very) basic pre-processing to convert pixel values to floats, and then shuffle the image instances and collect them into batch_size batches with tf.train.shuffle_batch(), which internally uses a RandomShuffleQueue and accumulates examples until it contains batch_size + min_after_dequeue elements:

image = tf.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([784])  
image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
label = tf.cast(features['label'], tf.int32)
# Randomly collect instances into batches 
images_batch, labels_batch = tf.train.shuffle_batch(
    [image, label], batch_size=128,
    capacity=2000,
    min_after_dequeue=1000)

The capacity and min_after_dequeue parameters are used in the same manner as discussed previously. The mini-batches that are returned by shuffle_batch() are the result of a dequeue_many() call on the RandomShuffleQueue that is created internally.

tf.train.start_queue_runners() and Wrapping Up

We define our simple softmax classification model as follows:

W = tf.get_variable("W", [28*28, 10])
y_pred = tf.matmul(images_batch, W)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y_pred, 
                                                      labels=labels_batch)

loss_mean = tf.reduce_mean(loss)

train_op = tf.train.AdamOptimizer().minimize(loss)

sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
init = tf.local_variables_initializer()
sess.run(init)

Finally, we create threads that enqueue data to queues by calling tf.train.start_queue_runners(). Unlike other calls, this one is not symbolic and actually creates the threads (and thus needs to be done after initialization):

from __future__ import print_function

# Coordinator 
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess,coord=coord)

Let’s take a look at the list of created threads:

threads

Out: 
[<Thread(Thread-483, stopped daemon 13696)>,
 <Thread(Thread-484, started daemon 16376)>,
 <Thread(Thread-485, started daemon 4320)>,
 <Thread(Thread-486, started daemon 13052)>,
 <Thread(Thread-487, started daemon 7216)>,
 <Thread(Thread-488, started daemon 4332)>,
 <Thread(Thread-489, started daemon 16820)>]

Having everything in place, we are now ready to run the multithreaded process, from reading and pre-processing batches into a queue to training a model. It’s important to note that we do not use the familiar feed_dict argument anymore—this avoids data copies and offers speedups, as discussed earlier in this chapter:

try:
  step = 0
  while not coord.should_stop():  
      step += 1
      sess.run([train_op])
      if step%500==0:
          loss_mean_val = sess.run([loss_mean])
          print(step)
          print(loss_mean_val)
except tf.errors.OutOfRangeError:  
    print('Done training for %d epochs, %d steps.' % (NUM_EPOCHS, step))
finally:
    # When done, ask the threads to stop
    coord.request_stop()

# Wait for threads to finish
coord.join(threads)
sess.close()

We train until a tf.errors.OutOfRangeError error is thrown, indicating that queues are empty and we are done:

Out:
Done training for 10 epochs, 2299500 steps.

Future input pipeline

In mid-2017, the TensorFlow development team announced the Dataset API, a new preliminary input pipeline abstraction offering some simplifications and speedups. The concepts presented in this chapter, such as TFRecords and queues, are fundamental and remain at the core of TensorFlow and its input pipeline process. TensorFlow is still very much a work in progress, and exciting and important changes naturally occur from time to time. See the issue tracker for an ongoing discussion.

Summary

In this chapter, we saw how to use queues and threads in TensorFlow, and how to create a multithreaded input pipeline. This process can help increase throughput and utilization of resources. In the next chapter, we take this a step forward and show how to work in a distributed setting with TensorFlow, across multiple devices and machines.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset