Chapter 2. The Ray API

To be intuitive, Ray must leverage concepts you already know. It does this by building on two familiar concepts in Python (and most languages): functions for stateless computing and classes for stateful computing.

This chapter offers a taste of the core Ray API. For more details and longer examples, see my Ray class on the O’Reilly Learning Platform. See the Ray documentation for extensive API details.

If you aren’t interested in the details of the core Ray API, skip ahead to the next chapter.

Just Six API Methods

Table 2-1 illustrates how almost everything you do with Ray is done with just six API methods. For a description of actors, see “What Are Actors?”.

There are other API methods for various administrative and informational purposes that we won’t go into. These six API methods are the essence of Ray, which provide its concision, flexibility, and power.

Table 2-1. The magnificent six
API method Description Example

ray.init()

Initialize Ray in your application.

ray.init()

@ray.remote

Decorate a function to make it a remote task. Decorate a class to make it a remote actor.

@ray.remote def train_model(source): … @ray.remote class ActivityTracker(): def record(event): … return count

.remote()

Construct an actor instance or asynchronously run a task or an actor method.

m_ref = train_model.remote(…) actor = ActivityTracker.remote() r_ref = tracker.record.remote(…)

ray.put()

Put a value in the distributed object store.

obj_ref = ray.put(my_object)

ray.get()

Get an object from the distributed object store, either placed there by ray.put() explicitly or by a task or actor method.

model = ray.get(m_ref) obj = ray.get(obj_ref)

ray.wait()

Wait on a list of IDs until one of the tasks completes. Return two lists, one with IDs for the completed tasks and the other with IDs for the still-running tasks.

finished, running = ray.wait([m_ref, r_ref])

Let’s see these six API methods in action.

Installing Ray

If you want to try the examples, you’ll need a recent version of Python. Versions 3.6 through 3.8 are supported. If you are new to Python, consider using Anaconda to manage your installation. Note that the version of Python that comes with your operating system may be obsolete.

You will also need to install the package manager pip.

You can install Ray in the command line:

pip install ray

Initializing Ray

In the Ray examples that follow, you can run them using the Python interpreter, ipython. The $ is the shell prompt, and the In [1] is the ipython prompt:1

$ ipython
Python 3.7.7 (default, May  6 2020, 04:59:01)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.15.0 -- ... Type '?' for help.

In [1]:

At the ipython prompt or in a script file, you import Ray and then call ray.init():

import time   # Other libraries we might need...
import ray

ray.init()

You’ll see output like the following:

INFO ... -- Starting Ray...
...
INFO ... -- View the Ray dashboard at localhost:8265
Out[2]:
{'node_ip_address': '192.168.1.2',
 'raylet_ip_address': '192.168.1.2',
 'redis_address': '192.168.1.2:6379',
 'object_store_address': '/.../sockets/plasma_store',
 'raylet_socket_name': '/.../sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_.../'}

You have started a single-machine Ray cluster on your computer that will use all the cores available. There is also a process running for a distributed object store, implemented in part with Redis, and a raylet that manages execution of your code.

Try opening the URL http://localhost:8265. (Change the port if you see a different number.) This is the Ray Dashboard that shows you how system resources are utilized globally and by individual Ray workers, which run your code. We’ll see it in action in the next section.

See the ray.init() documentation for details on many optional arguments you can pass to this method, including how to connect to a running Ray cluster.

From Functions to Ray Tasks

A virtue of the Ray API is how it builds on familiar concepts you already know from Python, which keeps the new concepts as intuitive as possible. If you want to encapsulate some work in Python, you write a function like this one, which might be used to query a database to retrieve reference data:

reference_data = [
  "Fast", "and", "Simple", "Distributed", "Computing"]

def get_ref_datum(id):
  time.sleep(id/10.0)   # Simulate an "expensive" operation
  foreign_key = 2*id
  value = reference_data[id%len(reference_data)]
  return (id, foreign_key, value)

Our example uses some reference data, the words in the Ray tagline at https://ray.io. To keep the example simple, the function just returns the input id, a made-up foreign key to another data record, which we’ll use later, and one of the words in reference_data. (The modulus of the input id is computed to avoid invalid array indices.) The call to time.sleep() simulates doing a time-consuming operation, like a real database query.

Let’s try it:

start = time.time()
data = [get_ref_datum(id) for id in range(10)]
duration = time.time() - start
print(f'{duration:6.3f} secs, data = {data}')

It takes about 4.5 seconds to complete, and prints the following:

 4.516 secs, data = [(0, 0, 'Fast'), (1, 2, 'and'),
   (2, 4, 'Simple'), (3, 6, 'Distributed'),
   (4, 8, 'Computing'), (5, 10, 'Fast'), (6, 12, 'and'),
   (7, 14, 'Simple'), (8, 16, 'Distributed'),
   (9, 18, 'Computing')]

Why 4.5 seconds? We sleep for id/10.0 seconds. We pass in the id values 0 through 9 (10 exclusive), which sum to 45. There is a little more time taken by other overhead. Note that all the invocations of get_ref_datum are independent of each other, so why not do them in parallel? This is where Ray comes in.

We want to convert our synchronous function, which is executed in the same thread as the interpreter, into a task that can be executed on any CPU core in our machine or across a multinode Ray cluster. Here is one way to do this:

@ray.remote
def get_ref_datum_task_first_attempt(id):
  return get_ref_datum(id)

The @ray.remote decorator marks task definitions. We just call get_ref_datum(). However, this task could run anywhere in a Ray cluster, and a more realistic implementation could use a large reference dataset, so let’s use our distributed object store to hold the reference_data and then retrieve it when needed:

ref_data_object_ref = ray.put(reference_data)

@ray.remote
def get_ref_datum_task(id):
  time.sleep(id/10.0)   # Simulate an "expensive" operation
  foreign_key = 2*id
  ref_data = ray.get(ref_data_object_ref)
  value = ref_data[id%len(ref_data)]
  return (id, foreign_key, value)

The reference data is put in the distributed object store ray.put(), which returns an object reference of type ObjectRef. The reference data is retrieved from the object store with ray.get(). There is extra overhead adding this interaction with the object store, but internal optimizations, such as zero-copy shared memory, can eliminate much of it. This is especially important when invoking get_ref_datum_task() many times concurrently, so that actual instances of the task are scheduled around the cluster. In this case, we will have just a single copy of the reference data on each node that needs it, rather than one copy per task instance.

Let’s try it:

start = time.time()
data_refs = [get_ref_datum_task.remote(id) for id in range(10)]
data = ray.get(data_refs)
duration = time.time() - start
print(f'{duration:6.3f} secs, data = {data}')

Tasks are invoked using an added .remote(…) method. In Python, it would be possible for Ray to modify the function so you can call get_ref_datum_task(id) directly, but seeing the .remote(id) in the code makes it obvious that Ray is in use here.

Tasks are executed asynchronously, so what gets returned? It is an ObjectRef corresponding to a future. We use ray.get() to retrieve the list of objects from the ids, which blocks until all the tasks complete.

Here is the output:

 1.029 secs, data = [(0, 0, 'Fast'), ...]

The data part is the same as before, which indicates that we have preserved the order of the resulting data, which could be important to your application, even though we ran the tasks asynchronously! However, the execution time is now only one second. Why one second? All the tasks now run in parallel, so we are gated by the longest task, which sleeps 9/10.0 seconds. Okay, so why isn’t the total time approximately 0.9 seconds? On my test machine, there are eight workers running. You can see the actual count on your machine in the Ray Dashboard discussed previously. The default value for this number is based on the CPU cores you have available.

This means the first eight tasks are scheduled immediately, while the last two have to wait for available workers. One waiting task, for id = 8, gets scheduled very quickly, because the task with id = 0 finishes immediately. The second and last waiting task, for id = 9, waits 0.1 second, then runs. So, its total “wall clock” time is 0.1 + 0.9 = 1.0 second.

Figure 2-1 is a screenshot of the Ray Dashboard with all eight workers running tasks.

Ray Dashboard While Running Tasks
Figure 2-1. Ray Dashboard while running tasks

There are two drawbacks to the code we just executed. First, since ray.get() is a blocking call, our main program (the “driver” in Ray parlance) just sits there doing nothing while we wait for the tasks to complete. Some of those tasks finish more quickly than others. It would be nice to process those results while the slower tasks are still running.

Second, in production code, calls that block without setting timeouts are risky. What if a remote task takes an abnormally long time to complete, due to some system problem? We really need a way to specify a timeout.

Let’s solve both problems with our last example, which uses ray.wait():

start = time.time()
data_refs = [get_ref_datum_task.remote(id) for id in range(10)]
data = []
while len(data_refs) > 0:
  finished, data_refs = ray.wait(data_refs, timeout=5.0)
  d = ray.get(finished)
  duration = time.time() - start
  print(f'  {duration:6.3f} secs, finished data = {d}')
  data.extend(d)
duration = time.time() - start
print(f'{duration:6.3f} secs, data = {data}')

Before explaining the code, here is the output:

   0.002 secs, finished data = [(0, 0, 'Fast')]
   0.105 secs, finished data = [(1, 2, 'and')]
   0.206 secs, finished data = [(2, 4, 'Simple')]
   0.306 secs, finished data = [(3, 6, 'Distributed')]
   0.406 secs, finished data = [(4, 8, 'Computing')]
   0.506 secs, finished data = [(5, 10, 'Fast')]
   0.606 secs, finished data = [(6, 12, 'and')]
   0.706 secs, finished data = [(7, 14, 'Simple')]
   0.810 secs, finished data = [(8, 16, 'Distributed')]
   1.012 secs, finished data = [(9, 18, 'Computing')]
 1.012 secs, data = [(0, 0, 'Fast'), (1, 2, 'and'),
   (2, 4, 'Simple'), (3, 6, 'Distributed'), (4, 8, 'Computing'),
   (5, 10, 'Fast'), (6, 12, 'and'), (7, 14, 'Simple'),
   (8, 16, 'Distributed'), (9, 18, 'Computing')]

Note that the result of a task is processed as soon as it becomes ready, starting after just 0.002 seconds!

The code is a little busy, because we also added more print statements so you can see what’s going on. The core idiom that combines ray.wait() and ray.get() is the following:

data_refs = [get_ref_datum_task.remote(id) for id in range(10)]
data = []
while len(data_refs) > 0:
  finished, data_refs = ray.wait(data_refs, timeout=5.0)
  d = ray.get(finished)
  data.extend(d)

We still make all the .remote() calls at once, because they return immediately with the future ObjectRef values. Then we loop over the subset of futures that are still running. ray.wait() returns as soon as one task finishes. (There is an optional key–value argument to specify more than one.) It returns two lists, one with the finished object references of the finished tasks and the other with the rest of the references for tasks still running. We then use ray.get() to retrieve the values for the finished tasks. Now ray.get() won’t block, because those tasks are already done.

We reset the data_refs value to be the remaining ids, because you never want to pass references for completed tasks to ray.wait(). If you do, it will return immediately with the first of the completed references, creating an infinite loop!

To avoid potential deadlocks, where our application stops while some tasks wait forever for resources that will never arrive, we pass an optional timeout value of five seconds. If the timeout is reached before any tasks are completed, ray.wait() will return an empty list for the first list.

For more on the core Ray task API, including ray.get() and ray.wait(), see the Ray documentation API and Package Reference page.

Task Dependencies

What if some tasks depend on the results of other tasks? In our example, we return a foreign_key, which we’ll use now to illustrate how elegantly Ray handles task dependencies. After retrieving a datum with a “foreign key” (in the SQL sense of the term), we’ll use that key to query again.

For reasons that will become clearer in a moment, we’ll define a second version of get_ref_datum_task() that takes a three-element tuple argument, the same tuples returned by calls to the original get_ref_datum_task():

@ray.remote
def get_ref_datum_task2(tuple):
  original_id, foreign_key, value = tuple
  return tuple, get_ref_datum(foreign_key)

This second version returns a two-element “tuple of tuples”: the first element is the input tuple, and the second element is a new tuple corresponding to the foreign_key.

Now use it:

refs1 = [get_ref_datum_task.remote(i) for i in [2, 4, 6]]
refs2 = [get_ref_datum_task2.remote(i) for i in refs1]
for tuple in ray.get(refs2):
  print(tuple)

Here’s the output:

((2, 4, 'Simple'), (4, 8, 'Computing'))
((4, 8, 'Computing'), (8, 16, 'Distributed'))
((6, 12, 'and'), (12, 24, 'Simple'))

For each output line, the first three-element tuple is from the original get_ref_datum_task() invocation, and the second three-element tuple is from the subsequent get_ref_datum_task2() invocation.

The reason for the new task definition is to allow us to simply pass an ObjectRef to get_ref_datum_task2(). Ray knows we need a tuple passed to this task, so Ray automatically calls ray.get() on the ObjectRef and passes the returned tuple to the task. Ray also schedules any get_ref_datum_task2() task after the corresponding get_ref_datum_task() task has completed.

So, Ray does two convenient things for dependent tasks:

  • We can pass ObjectRefs to tasks and Ray will handle the boilerplate of extracting results.

  • We don’t need to wait for the first set of tasks to complete before starting the dependent tasks. Ray handles all of this for us automatically.

If this example isn’t clear yet, try rewriting it to use ray.get() to first retrieve the results of the get_ref_datum_task() calls, blocking until they are finished. Then pass the resulting tuples explicitly to the get_ref_datum_task2() calls.

In other words, do the work that Ray does for you. See how much boilerplate you don’t need to write!

So, even when working with tasks that have complex graphs of dependencies, we write code very similar to the synchronous Python code we are accustomed to writing, yet it runs across a cluster asynchronously, with Ray managing the dependencies for us.

From Classes to Ray Actors

As useful as the task abstraction is, there’s a glaring deficiency that real applications face: how do we manage distributed state?

Ray’s actor model extends the familiar concept of object-oriented classes to encapsulate state and computation.

Let’s enhance our application so far by defining an actor that tracks the counts for requests for each id. First, we’ll start with a plain Python class:

class ReferenceDataTracker:
  def __init__(self, reference_data_size):
    self.size = reference_data_size
    self.counts = [0 for _ in range(self.size)]

  def increment(self, ref):
    refmod = ref%self.size
    self.counts[refmod] += 1
    return (ref, self.counts[refmod])

Just as functions are expanded to tasks by Ray, classes are expanded to actors. We can subclass RefDataTracker:

@ray.remote
class ReferenceDataTrackerActor(ReferenceDataTracker):
  def get_counts(self):
    return self.counts

RefDataTrackerActor needs to add a getter method to retrieve the counts. Ray doesn’t support directly reading actor fields like in a normal Python instance.

Actor construction uses .remote():

tracker = ReferenceDataTrackerActor.remote(len(reference_data))

Finally, let’s redefine get_ref_datum_task() to invoke the tracker actor to increment the count for the id. We pass in the tracker as an argument.

Note that actor methods are called with .remote(), just like tasks:

@ray.remote
def get_reference_datum_task(id, tracker):
  time.sleep(id/10.0)   # Simulate an "expensive" operation
  foreign_key = 2*id
  idmod = id%len(reference_data)
  value = reference_data[idmod]
  tracker.increment.remote(idmod) # Ignore returned ref
  return (id, foreign_key, value)

Now let’s try it:

start = time.time()
tuple_refs = [
  get_reference_datum_task.remote(i, tracker)
    for i in range(10)]
data = ray.get(tuple_refs)
counts = ray.get(tracker.get_counts.remote())
duration = time.time() - start
print(f"""
{duration:6.3f} secs:
data = {data}
counts = {counts}
""")

Here’s the output:

1.116 secs:
data = [(0, 0, 'Fast'), (1, 2, 'and'), ...]
counts = [2, 2, 2, 2, 2]

The data is the same as before. The counts indicates that each value was requested twice.

If you run the previous code again, the counts will be four, because the tracker actor is still running.

The ray libraries for ML provide more sophisticated examples of actors used for distributed state. One interesting example is implementing a parameter server with a set of actors. A parameter server is a specialized database for storing and serving model parameters for giant models. Beyond a certain size, it’s best to shard the parameter set, one shard per actor, for better performance. Distributed state can also be replicated for great resiliency and error recovery.

What’s Next?

Hopefully, you can appreciate that Ray’s core API is concise and intuitive yet very flexible for a wide variety of computations, including those with distributed state.

What about performance? Ray was written with very demanding ML/AI applications in mind, so optimizing performance has been an ongoing priority. See the Ray blog for posts with helpful details on Ray performance.

The Ray core API is low-level, because sometimes you need fine-grained control. Often, a higher-level API is all you need. Next, we’ll discuss several higher-level APIs included with Ray for ML projects.

1 Some command output will be elided for space reasons. Other output will be wrapped to fit.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset