Chapter 9. Distributed TensorFlow

In this chapter we discuss the use of TensorFlow for distributed computing. We start by briefly surveying the different approaches to distributing model training in machine learning in general, and specifically for deep learning. We then introduce the elements of TensorFlow designed to support distributed computing, and finally put everything together with an end-to-end example.

Distributed Computing

Distributed computing, in the most general terms, entails the utilization of more than one component in order to perform the desired computation or achieve a goal. In our case, this means using multiple machines in order to speed up the training of a deep learning model.

The basic idea behind this is that by using more computing power, we should be able to train the same model faster. This is indeed often the case, although just how much faster depends on many factors (i.e., if you expect to use 10× resources and get a 10× speedup, you are most likely going to be disappointed!).

There are many ways to distribute computations in a machine learning setting. You may want to utilize multiple devices, either on the same machine or across a cluster. When training a single model, you may want to compute gradients across a cluster to speed up training, either synchronously or asynchronously. A cluster may also be used to train multiple models at the same time, or in order to search for the optimal parameters for a single model.

In the following subsections we map out these many aspects of parallelism.

Where Does the Parallelization Take Place?

The first split in the classification of types of parallelization is the location. Are we using multiple computing devices on a single machine or across a cluster?

It is becoming increasingly common to have powerful hardware with multiple devices on a single machine. Cloud providers (such as Amazon Web Services) now offer this sort of platform set up and ready to go.

Whether in the cloud or on premises, a cluster configuration affords more flexibility in design and evolution, and the setup can grow way beyond what is currently feasible with multiple devices on the same board (essentially, you can use a cluster of arbitrary size).

On the other hand, while several devices on the same board can use shared memory, the cluster approach introduces the time cost of communication between nodes. This can become a limiting factor, when the amount of information that has to be shared is large and communication is relatively slow.

What Is the Goal of Parallelization?

The second split is the actual goal. Do we want to use more hardware to make the same process faster, or in order to parallelize the training of multiple models?

The need to train multiple models often arises in development stages where a choice needs to be made regarding either the models or the hyperparameters to use. In this case it is common to run several options and choose the best-performing one. It is natural to do so in parallel.

Alternatively, when training a single (often large) model, a cluster may be used in order to speed up training. In the most common approach, known as data parallelism, the same model structure exists on each computation device separately, and the data running through each copy is what is parallelized.

For example, when training a deep learning model with gradient descent, the process is composed of the following steps:

  1. Compute the gradients for a batch of training examples.
  2. Sum the gradients.
  3. Apply an update to the model parameters accordingly.

Clearly, step 1 of this schema lends itself to parallelization. Simply use multiple devices to compute the gradients (with respect to different training examples), and then aggregate the results and sum them up in step 2, just as in the regular case.

Synchronous versus asynchronous data parallelism

In the process just described, gradients from different training examples are aggregated together, in order to make a single update to the model parameters. This is what is known as synchronous training, since the summation step defines a point where the flow has to wait for all of the nodes to complete the gradient computation.

One case where it might be better to avoid this is when there are heterogeneous computing resources being used together, since the synchronous option entails waiting for the slowest of the nodes.

The alternative, asynchronous option is to apply the update step independently after each node finishes computing the gradients for the training examples it was assigned.

TensorFlow Elements

In this section we go over the TensorFlow elements and concepts that are used in parallel computations.  This is not a complete overview, and primarily serves as an introduction to the parallel example that concludes this chapter.

tf.app.flags

We start with a mechanism that is completely unrelated to parallel computing, but is essential for our example at the end of the chapter. Indeed, the flags mechanism is heavily used in TensorFlow examples and deserves to be discussed.

Essentially, tf.app.flags is a wrapper for the Python argparse module, which is commonly used to process command-line arguments, with some extra and specific functionality.

Consider, for instance, a Python command-line program with typical command-line arguments:

'python distribute.py --job_name="ps" --task_index=0'

The program distribute.py is passed the following:

	job_name="ps"
	task_index=0

This information is then extracted within the Python script, by using:

tf.app.flags.DEFINE_string("job_name", "", "name of job")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")

The arguments (both string and integer) are defined by the name in the command line, a default value, and a description of the argument.

The flags mechanism allows the following types of arguments:

  • tf.app.flags.DEFINE_string defines a string value.
  • tf.app.flags.DEFINE_boolean defines a Boolean value.
  • tf.app.flags.DEFINE_float defines a floating-point value.
  • tf.app.flags.DEFINE_integer defines an integer value.

Finally, tf.app.flags.FLAGS is a structure containing the values of all the arguments parsed from the command-line input. The arguments are accessed as FLAGS.arg, or via the dictionary FLAGS.__flags if necessary (it is, however, highly recommended to use the first option—the way it was designed to be used).

Clusters and Servers

A TensorFlow cluster is simply a set of nodes (a.k.a. tasks) that participate in parallel processing of a computation graph. Each task is defined by the network address at which it may be accessed. For example:

parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
           "localhost:2224",
           "localhost:2225"]
cluster = tf.train.ClusterSpec({"parameter_server": parameter_servers,
                                "worker": workers})

Here we defined four local tasks (note that localhost:XXXX points to port XXXX on the current machine, and in a multiple-computer setting the localhost would be replaced by an IP address). The tasks are divided into a single parameter server and three workers. The parameter server/worker assignments are referred to as jobs. We further describe what each of these does during training later on in the chapter.

Each of the tasks must run a TensorFlow server, in order to both use local resources for the actual computations and communicate with other tasks in the cluster to facilitate parallelization.

Building on the cluster definition, a server on the first worker node (i.e., localhost:2223) would be started by:

server = tf.train.Server(cluster,
                         job_name="worker",
                         task_index=0)

The arguments received by Server() let it know its identity, as well as the identities and addresses of the other members in the cluster.

Once we have the clusters and servers in place, we build the computation graph that will allow us to go forward with the parallel computation.

Replicating a Computational Graph Across Devices

As mentioned previously, there is more than one way to perform parallel training. In “Device Placement”, we briefly discuss how to directly place operations on a specific task in a cluster. In the rest of this section we go over what is necessary for between-graph replication.

Between-graph replication refers to the common parallelization mode where a separate but identical computation graph is built on each of the worker tasks. During training, gradients are computed by each of the workers and combined by the parameter server, which also keeps track of the current versions of the parameters, and possibly other global elements of training (such as a global step counter, etc.).

We use tf.train.replica_device_setter() in order to replicate the model (computation graph) on each of the tasks. The worker_device argument should point to the current task within the cluster. For instance, on the first worker we run this:

with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % 0,
        cluster=cluster)):

      # Build model...

The exception is the parameter server, on which we don’t build a computation graph. In order for the process not to terminate, we use:

server.join()

which will keep the parameter server alive for the duration of the parallel computation.

Managed Sessions

In this section we cover the mechanism that we will later use for parallel training of our model. First, we define a Supervisor:

sv = tf.train.Supervisor(is_chief=...,
                         logdir=...,
                         global_step=...,
                         init_op=...)

As the name suggests, the Supervisor is used to supervise training, providing some utilities necessary for the parallel setting.

There are four arguments passed:

is_chief (Boolean)

There must be a single chief, which is the task responsible for initialization, etc.

logdir (string)

Where to store logs.

global_step

A TensorFlow Variable that will hold the current global step during training.

init_op

A TensorFlow op for initializing the model, such as tf.global_variables_initializer().

The actual session is then launched:

with sv.managed_session(server.target) as sess:
    
    # Train ... 

At this point the chief will initialize variables, while all other tasks wait for this to be completed.

Device Placement

The final TensorFlow mechanism we discuss in this section is device placement. While the full extent of this topic is outside the scope of this chapter, the overview would not be complete without a mention of this ability, which is mostly useful when engineering advanced systems.

When operating in an environment with multiple computational devices (CPUs, GPUs, or any combination of these), it may be useful to control where each operation in the computational graph is going to take place. This may be done to better utilize parallelism, exploit the different capabilities of different devices, and overcome limitations such as memory limits on some devices.

Even when you do not explicitly choose device placement, TensorFlow will output the placement used if required to. This is enabled while constructing the session:

tf.Session(config=tf.ConfigProto(log_device_placement=True)) 

In order to explicitly choose a device, we use:

with tf.device('/gpu:0'):
  op = ...  

The '/gpu:0' points TensorFlow to the first GPU on the system; likewise, we could have used '/cpu:0' to place the op on the CPUs, or '/gpu:X' on a system with multiple GPU devices, where X is the index of the GPU we would like to use.

Finally, placement across a cluster is done by pointing to the specific task. For instance:

with tf.device("/job:worker/task:2"): 
  op = ...  

This will assign to the second worker task, as defined in the cluster specification.

Placement across CPUs

By default, TensorFlow uses all the CPUs available on the system and handles the threading internally. For this reason, the device placement '/cpu:0' is the full CPU power, and '/cpu:1' doesn’t exist by default, even in a multiple-CPU environment.

In order to manually assign to specific CPUs (which you would need a very good reason to do—otherwise, let TensorFlow handle it), a session has to be defined with the directive to separate the CPUs:

config = tf.ConfigProto(device_count={"CPU": 8},
                        inter_op_parallelism_threads=8,
                        intra_op_parallelism_threads=1) 
sess = tf.Session(config=config)    

Here, we define two parameters:

  • inter_op_parallelism_threads=8, meaning we allow eight threads for different ops
  • intra_op_parallelism_threads=1, indicating that each op gets a single thread

These settings would make sense for an 8-CPU system.

Distributed Example

In this section we put it all together with an end-to-end example of distributed training of the MNIST CNN model we saw in Chapter 4. We will use one parameter server and three worker tasks. In order to make it easily reproducible, we will assume all the tasks are running locally on a single machine (this is easily adapted to a multiple-machine setting by replacing localhost with the IP address, as described earlier). As usual, we first present the full code, and then break it down into elements and explain it:

import tensorflow as tf
from tensorflow.contrib import slim
from tensorflow.examples.tutorials.mnist import input_data


BATCH_SIZE = 50
TRAINING_STEPS = 5000
PRINT_EVERY = 100
LOG_DIR = "/tmp/log"


parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
           "localhost:2224",
           "localhost:2225"]

cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})


tf.app.flags.DEFINE_string("job_name", "", "'ps' / 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")
FLAGS = tf.app.flags.FLAGS


server = tf.train.Server(cluster,
                         job_name=FLAGS.job_name,
                         task_index=FLAGS.task_index)

mnist = input_data.read_data_sets('MNIST_data', one_hot=True)


def net(x):
    x_image = tf.reshape(x, [-1, 28, 28, 1])
    net = slim.layers.conv2d(x_image, 32, [5, 5], scope='conv1')
    net = slim.layers.max_pool2d(net, [2, 2], scope='pool1')
    net = slim.layers.conv2d(net, 64, [5, 5], scope='conv2')
    net = slim.layers.max_pool2d(net, [2, 2], scope='pool2')
    net = slim.layers.flatten(net, scope='flatten')
    net = slim.layers.fully_connected(net, 500, scope='fully_connected')
    net = slim.layers.fully_connected(net, 10, activation_fn=None,
                                      scope='pred')
    return net


if FLAGS.job_name == "ps":
    server.join()

elif FLAGS.job_name == "worker":

    with tf.device(tf.train.replica_device_setter(
            worker_device="/job:worker/task:%d" % FLAGS.task_index,
            cluster=cluster)):

        global_step = tf.get_variable('global_step', [],
                                      initializer=tf.constant_initializer(0),
                                      trainable=False)

        x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
        y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
        y = net(x)

        cross_entropy = tf.reduce_mean(
                tf.nn.softmax_cross_entropy_with_logits(y, y_))
        train_step = tf.train.AdamOptimizer(1e-4)
                .minimize(cross_entropy, global_step=global_step)

        correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        init_op = tf.global_variables_initializer()

    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir=LOG_DIR,
                             global_step=global_step,
                             init_op=init_op)

    with sv.managed_session(server.target) as sess:
        step = 0

        while not sv.should_stop() and step <= TRAINING_STEPS:

            batch_x, batch_y = mnist.train.next_batch(BATCH_SIZE)

            _, acc, step = sess.run([train_step, accuracy, global_step],
                                    feed_dict={x: batch_x, y_: batch_y})

            if step % PRINT_EVERY == 0:
                print "Worker : {}, Step: {}, Accuracy (batch): {}".
                    format(FLAGS.task_index, step, acc)

        test_acc = sess.run(accuracy, feed_dict={x: mnist.test.images, 
                                                 y_: mnist.test.labels})
        print "Test-Accuracy: {}".format(test_acc)

    sv.stop()

In order to run this distributed example, from four different terminals we execute the four commands for dispatching each of the tasks (we will shortly explain how exactly this happens):

python distribute.py --job_name="ps" --task_index=0
python distribute.py --job_name="worker" --task_index=0
python distribute.py --job_name="worker" --task_index=1
python distribute.py --job_name="worker" --task_index=2

Alternatively, the following will dispatch the four tasks automatically (depending on the system you are using, the output may all go to a single terminal or to four separate ones):

import subprocess
subprocess.Popen('python distribute.py --job_name="ps" --task_index=0', 
                 shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=0', 
                 shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=1', 
                 shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=2', 
                 shell=True)

Next, we go over the code in the preceding example and highlight where this is different from the examples we have seen thus far in the book.

The first block deals with imports and constants:

import tensorflow as tf
from tensorflow.contrib import slim
from tensorflow.examples.tutorials.mnist import input_data


BATCH_SIZE = 50
TRAINING_STEPS = 5000
PRINT_EVERY = 100
LOG_DIR = "/tmp/log"

Here we define:

BATCH_SIZE

The number of examples to use during training in each mini-batch.

TRAINING_STEPS

The total number of mini-batches we will use during training.

PRINT_EVERY

How often to print diagnostic information. Since in the distributed training we use there is a single counter of the current step for all of the tasks, the print at a certain step will happen only from a single task.

LOG_DIR

The training supervisor will save logs and temporary information to this location. Should be emptied between runs of the program, since old info could cause the next session to crash.

Next, we define the cluster, as discussed earlier in this chapter:

parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
           "localhost:2224",
           "localhost:2225"]

cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})

We run all tasks locally. In order to use multiple computers, replace localhost with the correct IP address. The ports 2222–2225 are also arbitrary, of course (but naturally have to be distinct when using a single machine): you might as well use the same port on all machines in a distributed setting.

In the following, we use the tf.app.flags mechanism to define two parameters that we will provide through the command line when we call the program on each task:

tf.app.flags.DEFINE_string("job_name", "", "'ps' / 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")
FLAGS = tf.app.flags.FLAGS

The parameters are as follows:

job_name

This will be either 'ps' for the single-parameter server, or 'worker' for each of the worker tasks.

task_index

The index of the task in each of the types of jobs. The parameter server will therefore use task_index = 0, and for the workers we will have 0, 1, and 2.

Now we are ready to use the identity of the current task in the cluster we defined in order to define the server for this current task. Note that this happens on each of the four tasks that we run. Each one of the four tasks knows its identity (job_name, task_index), as well as that of everybody else in the cluster (which is provided by the first argument):

server = tf.train.Server(cluster,
                         job_name=FLAGS.job_name,
                         task_index=FLAGS.task_index)

Before we start the actual training, we define our network and load the data to be used. This is similar to what we have done in previous examples, so we will not go into the details again here. We use TF-Slim for the sake of brevity:

mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

def net(x):
    x_image = tf.reshape(x, [-1, 28, 28, 1])
    net = slim.layers.conv2d(x_image, 32, [5, 5], scope='conv1')
    net = slim.layers.max_pool2d(net, [2, 2], scope='pool1')
    net = slim.layers.conv2d(net, 64, [5, 5], scope='conv2')
    net = slim.layers.max_pool2d(net, [2, 2], scope='pool2')
    net = slim.layers.flatten(net, scope='flatten')
    net = slim.layers.fully_connected(net, 500, scope='fully_connected')
    net = slim.layers.fully_connected(net, 10, activation_fn=None, scope='pred')
    return net

The actual processing to do during training depends of the type of task. For the parameter server, we want the mechanism to, well, serve parameters, for the most part. This entails waiting for requests and processing them. This is all it takes to achieve this:

if FLAGS.job_name == "ps":
    server.join()

The .join() method of the server will not terminate even when all other tasks do, so this process will have to be killed externally once it is no longer needed.

In each of the worker tasks, we define the same computation graph:

with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

    global_step = tf.get_variable('global_step', [],
                                  initializer=tf.constant_initializer(0),
                                  trainable=False)

    x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
    y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
    y = net(x)

    cross_entropy = tf.reduce_mean(
            tf.nn.softmax_cross_entropy_with_logits(y, y_))
    train_step = tf.train.AdamOptimizer(1e-4)
            .minimize(cross_entropy, global_step=global_step)

    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    init_op = tf.global_variables_initializer()

We use tf.train.replica_device_setter() in order to specify this, meaning that the TensorFlow Variables will be synchronized through the parameter server (which is the mechanism that allows us to do the distributed computations).

The global_step Variable will hold the total number of steps during training across the tasks (each step index will occur only on a single task). This creates a timeline so that we can always know where we are in the grand scheme, from each of the tasks separately.

The rest of the code is the standard setup we have seen before in numerous examples throughout the book.

Next, we set up a Supervisor and a managed_session:

sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                         logdir=LOG_DIR,
                         global_step=global_step,
                         init_op=init_op)

with sv.managed_session(server.target) as sess:

This is similar to the regular session we use throughout, except it is able to handle some aspects of the distribution. The initialization of the Variables will be done only in a single task (the chief designated via the is_chief argument; in our case, this will be the first worker task). All other tasks will wait for this to happen, then continue.

With the session live, we run training:

while not sv.should_stop() and step <= TRAINING_STEPS:

    batch_x, batch_y = mnist.train.next_batch(BATCH_SIZE)

    _, acc, step = sess.run([train_step, accuracy, global_step],
                            feed_dict={x: batch_x, y_: batch_y})

    if step % PRINT_EVERY == 0:
        print "Worker : {}, Step: {}, Accuracy (batch): {}".
            format(FLAGS.task_index, step, acc)

Every PRINT_EVERY steps, we print the current accuracy on the current mini-batch. This will go to 100% pretty fast. For instance, the first two rows might be:

Worker : 1, Step: 0.0, Accuracy (batch): 0.140000000596
Worker : 0, Step: 100.0, Accuracy (batch): 0.860000014305

Finally, we run the test accuracy:

test_acc = sess.run(accuracy,
                    feed_dict={x: mnist.test.images, y_: mnist.test.labels})
print "Test-Accuracy: {}".format(test_acc)

Note that this will execute on each of the worker tasks, and thus the same exact output will appear three times. In order to save on computation, we could have run this in only a single task (for instance, in the first worker only).

Summary

In this chapter we covered the main concepts pertaining to parallelization in deep learning and machine learning in general, and concluded with an end-to-end example of distributed training on a cluster with data parallelization.

Distributed training is a very important tool that is utilized both in order to speed up training, and to train models that would otherwise be infeasible. In the next chapter we introduce the serving capabilities of TensorFlow, allowing trained models to be utilized in production environments.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset