CHAPTER 11

image

Tuning Giraph

This chapter covers

  • Key performance factors and bottlenecks in Giraph
  • Optimal setups of the Hadoop cluster hosting Giraph
  • Designing and implementing ad hoc data structures optimized for your algorithms
  • Spilling excessing data to disk when necessary
  • The different Giraph parameters and knobs

So far, you have seen how you can use Giraph to compute graph analytics on large graphs across hundreds of machines. You have been presented with Giraph’s architecture and the programming model that allows you to write programs that scale. It is neat to write programs with the vertex-centric programming model, without worrying about the headaches of programming a parallel and distributed system. You write a compute function, and Giraph takes care of executing it on vertices, exchanging messages, and performing all the functioning of the system across your machines. How all this machinery works is hidden from you so that you can focus on the semantics of your application.

There will be times, however, when you will want to obtain all the performance you can from your Giraph application that is running perhaps slower than expected. Or worse, there will be times when your application is too eager of resources to be able to run until completion; for example, running out of memory. In either case, you need an understanding of how Giraph works under the hood, and how to leverage the hooks and knobs that are available to you to pass these limits. You have already seen some of the pieces in the previous chapters. In this chapter, you will look back at them, this time with a particular focus on performance tuning, and you are introduced to a new set of knobs and classes that you can play with.

Performance tuning is a bit of “black magic.” It is a mixture of intuition, experience, and gut feeling. Still, it is not about luck. There are two things you need to master to tune the performance of a large system. First, you need a good knowledge of the principles behind the functioning of the system. That is, you have to do your homework learning the architecture and understanding how data and computation flow through all the different components. Second, you need the patience and the perseverance of trying out different solutions, and measure them.

A big misconception in computer systems, or even computer science in general, is that if you have an idea of the functioning of a system then you can decide the best tuning of the system on paper. Sure, you can make some assumptions and estimations, perform complexity analysis, but in the end the devil is in the details (or in the constants) and you have to test your assumptions and calculations against the clock. This means that in this chapter, you are given some ideas, and the principles behind the tuning knobs of Giraph are explained, but we cannot tell you how to improve your very own application. Only you can, with “educated” trial and error. Hopefully, it will be easier with the help of this chapter.

Key Giraph Performance Factors

There are many ways of characterizing the performance of a computation (or a system, an application, etc.). One way of doing it is by putting the computation in one of the two following classes.

  • Compute-intensive: A computation that devotes most of its time to computational effort, due to high complexity. In other words, a computation that spends most of its time executing instructions on a relatively small piece of data. Examples of compute-intensive computations are the calculation of prime numbers, satisfiability solvers (SAT), simulations, and so forth.
  • Data-intensive: A computation that processes large amounts of data, on which it does not perform particularly complex calculations, hence devoting most of its time to IO. In other words, a computation that spends most of its time going through data and moving it around. Examples of data-intensive computations are most of Hadoop computations, data filtering and aggregation, indexing, and so forth.

Intuitively, one way of deciding whether a computation is compute-intensive or data-intensive is to identify whether it would take advantage more of faster CPUs or of faster disks / network.

Many computations in the Hadoop world are data-intensive. For example, the majority of MapReduce applications spend most of their time reading through data files, filtering, chopping and sorting records, passing them between mappers and reducers, aggregating them, and writing them back to files. Many of these operations require reading and writing from and to disks and network. In fact, to speed up a MapReduce cluster you often want multiple disks on each machine, so that when a core performs IO on a disk, the other cores do not have to idle waiting for their turn to access that same disk. Ideally each core would read from a different “dedicated” disk. This is also why MapReduce jobs can take advantage of fast network, as a lot of data is just passed around in the shuffle and sort phase between the map and reduce phases.

After all, this is why it is called Big Data. Big Data is about leveraging small computations performed on many little pieces of information, and putting the result all together. This is pretty much the definition of a data-intensive computation. Graph algorithms are not different; in fact they are perfect examples of data-intensive computations. As you have seen, information in a graph is contained in the connections across the vertices, and that is pretty much it. Graph algorithms navigate through these connections and vertices multiple times, and perform small computations on each of them. Think of PageRank. At every iteration, each vertex is computed by considering the incoming messages and outgoing edges. But what it does on each vertex is really just sum up the incoming PageRank values, dividing the sum by the number of outgoing edges, and pass the result along through the outgoing edges. These are not complex operations. The more data you have, hence edges and vertices, the longer the computation will last. And the same holds for the SSSP algorithm. Each vertex goes through its incoming messages and identifies the smallest one. If it finds one that is smaller than its current value, it passes it along. This is also pretty cheap computation and its cost depends pretty much only on the size of the data it works on.

Image Note  Graph computations tend to be data-intensive, as they require visiting the data graph multiple times and performing few computations on each vertex or edge. Giraph computations are data-intensive, as most of the runtime is spent processing graph data. That is, executing the compute function on each vertex, its associated edges and values, and the messages sent to it, and to exchange the messages between workers.

Now, in a Giraph computation there are two classes of data items that fill-up the workers memory:

  • The graph: The vertices, each with their edges, are what fills up a large portions of the heap. By default Giraph stores the graph in memory during the whole computation, so the larger is the graph, the more memory you need.
  • Computation state: The state in a Giraph computation comprises the vertex values (sometimes also the edge values if they change during the computation) and the messages that are transitioning. Often, the number of messages is proportional to the number of edges, which are the most frequent items in any reasonable graph.

In other words, graph and messages together occupy more or less all the used heap. Moreover, during each superstep, workers spend a lot of time “just” transmitting the messages over the network. That is why combiners and good partitioning can speed up the computation, sometimes even halve it, and drastically reduce memory usage: simply because they can drastically reduce the amount of data being stored and exchanged overt the network.

The bottom line of the this chapter is that to reduce the amount of memory and speed up your computation you’ll have to reduce the amount and size of the data stored in memory; that is, the vertex and edge values, and the amount and size of the data sent over the network, i.e. the messages. In the remainder of this chapter you’ll have a look at how to tackle these problems through “memory-tight” implementations of some of the data-structures used by Giraph. Unfortunately, there are other things that cannot be discussed in this chapter; for instance, there are a number of ways to implement each algorithm in the Giraph programming model. However, we are not able to discuss algorithmic-specific implementation problems, as they really depend on each specific algorithm.

Giraph’s Requirements for Hadoop

Giraph runs as a Hadoop application. As such, it can take advantage, or be penalized, by some decisions that regard the setup of the hosting Hadoop cluster, and the cluster-related job configuration parameters.

Hardware-related Choices

Often, you’ll be running Giraph on a cluster over which you do not have much control. You’ll be just a user without administrative power on the cluster. But, if you can discuss with your Hadoop administrators and ask for some changes, in this section you look at what you should be asking. Moreover, some organizations have specific clusters, or subsets of nodes, that they use for Giraph. If the number of workloads that you’ll be able to solve with Giraph will increase over time, it could make sense to make some Giraph-oriented decisions when your cluster is expanded.

The profile of machine that is more appropriate for Giraph is a machine with a lot of main memory; that is, RAM. Giraph does not need large amounts of disk storage, so disk are not something you want to focus on when you think about Giraph. As you will see later in this chapter, Giraph has the ability to store portions of graph and messages on local disk, but you should consider it as a plan B and not design for it. Traditionally, MapReduce jobs have not asked for much main memory, as data was directly streamed from and to disk and network. However, recently main memory has played a more important role even for MapReduce jobs, for example, to speed up the disk-based shuffle-and-sort phase and to execute some of the more memory-eager operators of computations like Hive and Pig. This means it’s likely that your machines already have a lot of memory nowadays, but if you plan to extend your cluster buying machines for Giraph, focus on memory.

Second, as said, an important fraction of the time of a Giraph computation is spent exchanging messages between workers. The way to speed up this phase (apart from algorithmically reducing the size and number of messages) is by using a fast network. Compared to memory, you will probably have even less control on the network available to you. Still, it is worth considering bonding interfaces together, or utilizing machines on the same rack. Giraph is more traditional when it comes to scalability, and while it can scale horizontally, it prefers vertical scalability; that is, few beefy machines are better than many small ones. Hence, a solution could be to fit some beefy machines in a rack, and just use those with Giraph.

Image Tip  Giraph prefers few beefy machines than many small machines. When you consider buying machines to extend your Hadoop cluster to accommodate Giraph, focus on memory rather than storage, and try to put the machines on the same rack.

This consideration about vertical scalability is important, so you will spend the rest of this section on it. Before you start, let’s be clear about what is considered vertical scalability here. In the Hadoop world, vertical scalability is often considered an expensive “old-fashioned” way of deploying infrastructure, which may be less reliable as it depends on few machines (and hence less redundancy). Here, you refer to the choice of using few machines but with more cores, to achieve the desired amount of cores in the cluster, while still remaining in the domain of “commodity machines.” The reason is explained next.

In Giraph, given the same algorithm and the same graph, the amount of data that is sent over the network increases as you add more machines/workers. Intuitively, when you add one machine/worker to your cluster, more messages than before need to be transmitted over the network, for the simple reason that vertices that were local before (as-in stored on the same machine) are now remote (some of those vertices is on the new machine/worker). A message between two vertices that used to be exchanged within the same worker (and hence not transmitted) now needs to be transmitted over the network. Needless to say, this is a slower operation than simply putting the message in the vertex inbox within the same worker (which, inside of the same JVM, is just putting an object in a map).

Now, increasing the number of workers also increases parallelism, hence adding more computing units to your computation is something you want to do in order to increase performance. However, you want to do it by increasing the number of cores instead of the number of machines, if possible. Increasing the number of cores without increasing the number of workers increases parallelism without increasing network usage. Concretely, it is preferable to have 10 machines with 8 cores each, than 80 machines with one core each. The total number of cores is still 80, but with fewer machines you obtain much more local vertices and hence less network usage. Note that while increasing the number of machines increases parallelism, it also increases the amount of data sent over the network. The is only so-much runtime improvement you can get by adding machines, and that is a trade-off you have to discover by running tests.

Job-related Choices

Per-worker parallelism with Giraph is achieved through compute threads. In Giraph, every worker has a number of partitions assigned to it, each partition with a number of vertices in it. The granularity of parallelism of Giraph is at partition level, meaning that each compute thread is assigned a number of partitions, and the thread is responsible to execute the compute function on each vertex belonging to those assigned partitions. Through compute threads, a worker can make use of all the cores available. Note that Giraph has also a number of other threads running during a computation, but that is discussed later. The relevant parameters are described later in this chapter.

There are two ways of exploiting the cores available to a cluster. Imagine you have 10 machines with 10 cores each. One way is to run 100 workers, assigning 10 workers to each machine, and using 1 compute thread in each worker. The other way is to run 10 workers, assigning 1 worker to each machine, and using 10 compute threads in each worker. Although theoretically the end result is full usage of the 100 cores, the two main differences are as follows:

  • By using 100 workers, you obtain less locality when compared to using 10 workers, hence more messages are transmitted over the network. You may say that as 10 workers are on each machine, less of the network is used anyway; that is, between workers on the same machine. This is true. However, whereas two workers assigned to the same machine can communicate over the loopback, Giraph still has to treat them internally as remote communications (Giraph is not aware of this worker-worker per-machine locality); for example, serializing and deserializing messages via the Netty-based components, writing and reading data to and from sockets, and so forth.
  • Memory is wasted on overhead. Having 100 workers means 100 Hadoop tasks and 100 JVMs. All this redundancy results in wasted memory, as you can obtain the same results with 10 times fewer tasks and JVMs. Plus, many data structures within a Giraph worker occupy space proportionally to the number of workers used, hence utilizing more heap memory.

You may think this is obvious and even unnecessary to point out. Why would you use multiple workers per machine? As it turns out, it is not trivial to request Hadoop to run one task per machine, particularly on clusters that you have to share; for example, MapReduce jobs. The setting is often not decided by the cluster administrators (i.e., mapred.tasktracker.map.tasks.maximum) and cannot be overridden by the user submitting the job.

Image Tip  Avoid using multiple workers per machine and try to run as many workers as the number of machines available to you. Achieve parallelism within each worker on each machine by using multiple compute threads instead.

On pre-YARN clusters, a Giraph computation runs as a single map-only MapReduce job, where each worker would be executed as a mapper task. Traditionally, each machine, or its TaskTracker, would be set up to accommodate multiple concurrent mapper and reducer tasks, usually proportionally to the number of cores or disks. In addition, the maximum heap size of each task would be proportional to the total memory size divided by the number of tasks accepted, to avoid overcommitting. This means that to obtain all the available resources in a machine, you have to request the total number of available mapper tasks to use as workers, resulting in the suboptimal scenario described earlier with multiple workers per machine. On older versions of Hadoop, this could be overcome by setting the maximum number of mapper tasks per TaskTracker to 1 on the client-side. However, this option was discontinued in the more recent versions.

With YARN, things have changed. YARN is designed around the concept of containers. Containers represent a collection of physical resources, such as memory, CPU cores, and disks. YARN does not have a fixed number of mapper and reducer tasks, but instead depends on the available resources and the requested resources. A container can be defined to be of minimum and maximum size, relatively to the available resources to the machine; for example, 80% of the total memory. When a container is assigned 80% or 90% of the available memory in a machine, the ResourceManager will not claim more containers on that machine. When Giraph is running in YARN, each worker utilizes its own container. This means that you can obtain a setup of 1 worker per machine by requesting all the resources available to each machine for each worker/container; for example, memory. Here, we are assuming that all that memory is necessary and is used, and hence discussing a way to claim it to minimize redundancy. In practical terms, this can be obtained by submitting Giraph jobs with a heap size close to the size of the memory available to the machine (the smallest in the cluster, in case of heterogeneous clusters).

Image Tip  When using YARN, request for each container the amount of memory and CPUs available to a single machine. This allows a single worker to run on each machine.

This section concludes with a short discussion about homogenous and heterogeneous clusters. This discussion relates to both hardware- and job-related aspects of performance. Because of the synchronization barrier, a Giraph superstep last as long as it takes to the slowest or most loaded machine to compute its share. For this reason, you want to be able to use comparable machines, with similar computational resources, so that your faster machines do not have to idle at the synchronization barrier waiting for the slowest (or most loaded) to finish. Also, you want to make sure that each machine has the same load as the others. This is usually achieved through partitioning, by ensuring that vertices are partitioned in similar number across workers. Default hash-partitioning scheme usually guarantees that. Note that similar computational resources means similar CPU, memory, and network. All of these resources are as important. If a machine has a slower CPU, it takes more time to compute its part of the work. If it has less memory, it could fill-up its heap before the others, and run out of memory (note that your request for heap is global to all the workers). If it has a slower network, it takes more time to transmit its messages. In any case, it causes the other workers to wait idling at the barrier. At the same time, if a machine is assigned more workers, it is more loaded, and hence the workers assigned to it take more time to conclude their portion of the superstep. This has the same effects as having a machine with less computational resources.

Image Tip  Giraph prefers homogenous clusters, composed of machines with similar computational resources such as CPU, main memory size, and network. The slowest machine defines the duration of each superstep, causing faster machines to idle waiting for it at the synchronization barrier.

Tuning Your Data Structures

Giraph is designed internally to minimize use of memory. Initial versions of Giraph used pure Java object for all edges, vertices and messages. With graphs comprising billions of edges and vertices, and more billions of messages created and consumed at each superstep, the JVM would experience substantial pressure on its memory-management components. For all objects created by an application, the JVM has to allocate internally additional memory for the accounting of those objects. Moreover, the JVM garbage collector constantly tracks objects being created and destroyed, and moves them around to avoid fragmentation of the heap. These are all CPU cycles that the JVM could be using to compute application code, namely Giraph and your code. For this reason, Giraph uses a number of non-orthodox tricks to minimize memory footprint and time spent performing garbage collection.

In a nutshell, Giraph does some memory management on its own. For instance, it stores some of the data, like edges, messages and vertex values, serialized inside of byte arrays that it allocates at the beginning of the computation. Still, it offers a pure Java object-oriented API. To achieve this abstraction, Giraph keeps a number of objects around, internally called representative objects, which it reinitializes with data coming from these binary arrays before passing them to the user, and which it serializes back to the arrays after the user is done with them. This mechanism is used, for example, for the Iterable containing the messages passed to the compute function, or the default implementation of the OutEdges interface where edges are stored for each vertex. To understand how this works, let’s have a look at the default implementation for these two classes.

The OutEdges Interface

The edges of a vertex are contained in a class implementing the OutEdges interface. The OutEdges interface extends the Iterable<Edge<I, E>> interface with the ability to add and remove edges, and to count the number of edges stored. The interface is used to access the edges, each contained in an Edge<I, E> object. Intuitively, you would expect that the outgoing edges of a vertex would be stored, for example, either in a Map<I, E>, to allow efficient random access, or in List<Edge<I, E>>, to allow efficient iteration. Before getting into the details about the implementation, it is important to understand these two access patterns to the edges in Giraph computations.

For example, think of PageRank and SSSP. In both, the compute method iterates through the outgoing edges to send messages to the other endpoints and take into account the edge weights as needed during the iteration over the edges. As you never look up a specific edge by its vertex ID (or by the other endpoint ID), it is sufficient to store edges in a data structure that supports efficient iteration, like an array or a list. Note that storing edges in an array is faster to iterate, as data is stored in a continuous chunk of memory, and more space-efficient, as you do not need anything else than the array of pointers to the edges. If you consider Label Propagation Algorithm described in Chapter 4, on the other hand, you access edges based on the senders of the messages, to update the edge values with new labels. For this kind of access, storing the edges in a map allows faster lookup (the cost of searching for an element in an unsorted array is proportional to the size of the array), but incurs additional overhead due to the map implementation itself, like a tree or a hashmap. Moreover, a tree is typically slower to iterate than an array. Hence, depending on the access pattern to the edges dictated by your algorithm, you want to use the right implementation of the OutEdges interface.

In additional to the iteration-friendly API of the OutEdges interface, Giraph comes with an additional interface, the StrictRandomAccessOutEdges<I, E>. This interface extends the OutEdges interface with methods to get and set edge values through lookups of the edge target vertex ID. Giraph provides an implementation for such interface called HashMapEdges, which as the name suggests, is backed by a Java hashmap for random access to the edges. By default, Giraph uses a class called ByteArrayEdges, which stores the edges serialized in a byte array, and it does not support random access to the edges. You can choose which class to use to store outgoing edges through the giraph.outEdgesClass Giraph parameter, including your own implementation. Let’s have a look at why and how ByteArrayEdges makes use of byte arrays to minimize memory-footprint.

Iteration-Friendly OutEdges

As OutEdges extends the Iterable interface, ByteArrayEdges has to provide an implementation of Iterator<Edge<I, E>>. In other words, it has to be able to return an Edge<I, E> object at each call to the next() method of the iterator. This protocol is important as it allows the user to ignore of how things are functioning inside of any class implementing OutEdges. As mentioned earlier, a simple implementation would use a number of native Java objects—for example, one for each edge—and in turn, one for each edge value and target vertex ID associated to the edge. Having all of these objects around introduces additional memory overhead and pressure on the Garbage Collector(GC). Instead of creating and keeping these objects around, ByteArrayEdges stores them inside of a byte array via the methods provided by the Writable interface that these objects have to support for check-pointing. Storing the edge data serialized in byte arrays drastically reduces the number of objects, but you still need to provide Java objects to comply with the Giraph API. For this reason, ByteArrayEdges keeps a number of objects that it reinitializes based on the data in its byte array under the hood before returning them to the caller. Because internally these representative objects are constantly overwritten, the caller cannot keep a reference to them, but must make a copy if needed. This is fine, because the typical access pattern toward outgoing edges, like in the algorithms mentioned earlier, is just to iterate the edges and use their values locally at each step of the iteration. To ensure that the user knows this caveat, ByteArrayEdges implements the ReusableObjectsOutEdges interface, which precisely states this behavior. To have a better understanding of the mechanism, let’s have a look at the implementation of the Iterator<Edge<I, E>> implemented as an inner class of ByteArrayEdges in Listing 11-1.

#1 Internally we encapsulate the byte array coming from the outer class into an ExtendedDataInput that we need to feed the Writable interface methods

#2 Create a reusable edge that we will return to the user

#3 Check whether edges exist and we haven’t consumed all of them

#4 Rewrite the representative edge with the next data available in the array and return it to the user

First, note that serializedEdges is a private field of type byte[] used to store the edges and it belongs to the outer class ByteArrayEdges. In the same way, serializedEdgesBytesUsed is a private field of type int that counts the number of bytes used in that array. At construction, the iterator wraps the byte array into a DataInput object, used by Writable to read data. Furthermore, you construct a representativeEdge object of type ReusableEdge. ReusableEdge is an interface used internally to Giraph and not exported to the user, which allows overwriting a target vertex ID in an edge when the representativeEdge is reused. Second, note how at every call of the next() method the representativeEdge object is reinitialized consuming the wrapped array. This technique allows you to save memory and keep a generic interface, as you rely on the Writable interface that edges have to implement. Keep in mind that this mechanism puts more pressure on the CPU, which has to constantly serialized and deserialize objects, but usually the advantage is worth the cost.

Random Access-Friendly OutEdges

As mentioned earlier, HashMapEdges supports efficient lookup of edge values but to do so, it generates a number of objects for each edge. Supporting random lookups in a byte array would require additional complexity, such as sorting the edges and performing a binary search, or performing a linear search at each lookup. Moreover, the former approach would be feasible only assuming fixed-sized edge values, which is not always the case in a Giraph application. Another approach that is addressed in this section is using a hashmap of primitive types. Open source project fastutil1 provides fast and compact type-specific implementations for common Java Collections data structures, such as maps, sets, lists, etc. Differently from using native Java objects, these data structures are automatically generated through a preprocessor, and work for different combinations of Java primitive types. For example, it provides an Int2ByteMap interface that extends the Java Map<Integer, Byte> interface, but the provided Int2ByteOpenHashMap implementation underneath stores a key as an int and a value as a byte. While the interface is complaint to the Collections API from Java, namely the methods of the map accept and return objects of type Integer and Byte, underneath these objects are not used in favor of their primitive-type counterpart. It has been shown that the implementations in this package are often the fastest out there, and with smallest memory footprint. Now, with the help of these classes, you can play a similar trick to the one in ByteArrayEdges. Imagine you want to optimize the implementation of LPA. Imagine that the vertex IDs are integers, and so are the edge values. The StrictRandomAccessOutEdges<I, E> interface extends OutEdges<I, E> with two methods in which you are interested, namely E getEdgeValue(I targetVertexId) and void setEdgeValue(I targetVertexId, E edgeValue) that get and set an edge value respectively. You need these methods when you iterate over the labels contained in the messages to update the corresponding labels that you store in the edge values. Listing 11-2 shows the implementation of a class OpenHasMapEdges<IntWritable, IntWritable> based on fastutil. In the interest of space, the implementation of some unrelated methods have been omitted.

#1 We keep the values in a map based on primitive types

#2 Also we keep a representative object for edge values of the actual type

#3 Proxy a request to add an edge as a put in the map

#4 Proxy a request to remove an edge as a remove from the map

#5 We keep an iterator of the map that we will proxy

#6 We keep a representative Edge in the iterator to reinitialize with data coming from the map iterator

#7 Proxy the request to the internal iterator

#8 Reinitialize the representative object with data coming from the map

#9 Fetch the data from the map and return the update representative object

#10 Update the backing map with the new value

The class is very simple. You store internally a fastutil map of primitive types that correspond to the specific types the algorithm is expecting. In this sense, the approach is less general than ByteArrayEdges, which accepts Generic types. In fact, if you want to go through this road, you have to modify the given class with specific primitive types for each different algorithm. It may take some copy-pasting, but it is worth the effort. For the rest, the class proxies the calls to the internal map to get and put edge values coming from the user. Note that you can also proxy the calls for the iterator, by relying on the internal iterator that you proxy under the hood. This is pretty much all there is to know about tailoring OutEdges to your algorithms to save memory and increase speed.

The MessageStore Interface

Messages are the other data that fills the heap of the workers. Some algorithms produce a number of messages proportional to the number of edges, such as PageRank, other produce less and the number at each superstep depends on many factors as, for example, the topology of the graph. Moreover, certain algorithms generate very large messages, as, for example, the algorithm to compute clustering coefficients, which sends lists of neighbor IDs as messages to detect triangles. To minimize the impact of messages, Giraph stores messages through a technique similar to that presented in the previous section regarding edges. Similarly to the ReusableObjectsOutEdges interface, also the references to messages are valid only before the next call to the next() method of the iterator passed to the compute() method for each vertex. The reason is the following. Messages are stored in a serialized format inside of byte arrays, and what the iterator passes to the user is a representative message that is reinitialized with message data at each call. This trick is implemented by the ByteArrayMessagesPerVertexStore class, which is the default implementation of the MessageStore interface. A MessageStore is the inbox where Giraph stores all messages sent to the vertices. Each worker has one of such stores, and within the store messages are grouped by destination vertex, which are themselves organized per partition. In other words, you can imagine a MessageStore as a nested map, with partitions as keys of the outer map, and vertex IDs as keys of the inner maps. One thing to notice is that combiners drastically reduce the number of message transiting the system. This can have particular impact on the runtime management of a store. In fact, Giraph uses a different default implementation when combiners are involved, as Giraph combines all incoming messages at the store-level as they arrive to the destination worker. In other words, Giraph only stores a single message, if any, in the store when a combiner is involved.

The question is then, how can you save more memory given that the default message stores provided by Giraph are already efficient? The idea is again similar to what you have seen in the previous section. You can use data structures tailored to the data types that you use in your application, through primitive types-based classes provided by fastutil. While compact and minimizing the number of objects, the original stores are still based on the original Java HashMap class, in order to be as general as possible to the vertex ID type and message value. But as you want to minimize the memory footprint of the store as much as you can, you can drop this requirement for generality and implement a store that is defined precisely around the data types. Let’s first have a look at how the default implementation in Giraph works, and then let’s have a look at how to write a message store for a vertex ID of type int and message value of type float. So, first you will look at the SimpleMessageStore abstract class, which implements some of the routines that are used by, for example, the OneMessagePerVertexStore class. Listing 11-3 presents a selection of methods from the class.

#1 Messages are organized per-partition, inside of 2-levels of nested maps

#2 The method to pack messages in an iterable is left abstract

#3 Partitions messages are returned or an empty map is created instead

#4 Vertex messages are fetched from the map and packed in an iterable object

There are a couple of things to notice here. First, being abstract the class cannot be instanced, but it provides some basic functionality to manage the nested map mentioned before. In particular, note how the getVertexMessages() method first gets the inner per-partition map, then it fetches the vertex messages, and then it relies on the getMesssagesAsIterable() method to pack the messages in an iterable object that is passed to the compute method together with the vertex. Second, as you may have noticed, the implementation of getMesssagesAsIterable() is not provided. This is exactly because, depending on the store, for a given vertex, you may have either a list of messages serialized in a byte array, or a single message when a combiner is involved. You need to let the implementer handle this aspect further. Let’s have a look at this latter case. Listing 11-4 presents some of the implementation of OneMessagePerVertexStore, which, as mentioned, stores only one message per vertex through a combiner.

#1 We make sure we have a message for each vertex

#2 We update the message by combining the previous one with the new one

#3 We return an iterable implemented by a singleton

Here you can note how getMesssagesAsIterable() packs the only message in a singleton iterator. You can also notice the way addPartitionMessages() inserts messages in the store. Also, messages are put into the store in groups, as they are buffered upon reception. In particular, note how messages are combined one after the other with the message currently stored, if any, in the store.

You are now ready to look at a primitive type-based implementation of such a class. Listing 11-5 presents the IntFloatMessageStore class, which as the name suggests, is a message store designed for algorithms where vertices IDs are of type int and messages are of type float. Also in this case, only the part of the implementation relevant to the discussion is presented.

#1 We create all the partitions at construction for efficiency

#2 We use reusable objects that we reinitialized to save objects

#3 We reinitialized objects with primitive typed data

#4 We use the reusables to combine current and new message

#5 We return a single iterable containing an object created on-the-fly

This time the outer map is a fastutil Int2ObjectOpenHashMap class, as keys are the integer partition IDs and the values are the inner map objects, in this case Int2FloatOpenHashMap objects, that map vertex IDs to their message. The functioning of the class in both add PartitionMessages() and getVertexMessages() methods recalls what you have seen in the previous two classes. This time however it was not possible to reuse code coming from the SimpleMessageStore class because all the functionality is tailored to the specific primitive types. The last thing to mention is how to specify which class to use as a message store. As with other classes, the message store class is specified with a Giraph parameter. In particular, you must use the giraph.messageStoreFactoryClass parameter to specify a factory class that generates MessageStore implementations. Examples of factory classes come together with the message stores in the Giraph source code; the interested reader is directed to the source code for examples of factory implementations.

Going Out-of-Core

Many times this book has underlined how Giraph was designed to compute graph algorithms in memory, and that this is a key factor in allowing such fast computations on massive graphs. However, this can also be Giraph’s weakness, as sometimes data can be excessive and workers can run out of memory. In the previous sections you have seen how Giraph already implements some nice tricks to minimize memory footprint, and how you can tailor your data structures to minimize it even more. Yet, there will be times when all of these efforts won’t be enough. In those cases, you want to try the capability of Giraph to spill excessive data to disk. Keep in mind that this capability should be considered a last-resort option because it may degrade performance substantially if used excessively.

Giraph can store both messages and graph (both the topology, and the vertex and edge values) on local disks during the computation of each superstep. The fact that Giraph can make use of disks does not turn it into a database. Its access to disk should not be considered as a layer of persistence, but more of a swap partition in an operating system. It spills to disk the portion of the state and the graph that is excessive to keep in main memory to reach the conclusion of the computation. In other words, from the perspective of the user there is no difference to have Giraph running with or without the out-of-core capability activated, except maybe for performance. The only thing the user is asked to do is to choose whether to spill messages, graph, or both, and how much of them. This section briefly describes how the functionality works, so that you can make your choice and investigate what works best for your application.

Out-of-Core Graph

Each worker stores the vertices assigned to it inside so-called partitions. Inside a partition are all the vertices, and their edges, assigned to that partition. During a superstep, a worker processes one partition after the other, or multiple partitions in parallel if multiple compute threads are used. Within partitions, one vertex is processed after the other. The out-of-core graph functionality (or OOCG) allows a worker to keep only a user-defined number of partitions in memory, while the remaining ones are stored on local disk. When a worker is done processing all the partitions that are currently in memory, it spills a number of processed partitions to disk to load unprocessed ones from disk into memory. The worker will not start spilling or loading any vertex until the partition the vertex belongs to has been processed completely. This means that OOCG works at a partition-level granularity. No single vertex (or edge) is spilled or loaded from disk individually.

When OOCG is active by default Giraph keeps in memory N partitions, of which a number is considered sticky, typically defined as Nk (where N is the number of partitions to be kept in memory at all times, and k is the number of compute or input threads). Sticky partitions are partitions that are never swapped to disk. You want to keep at least k non sticky partitions when using k compute threads so that each compute thread can swap a partition to disk with an unprocessed one without having to wait. By default, which partitions are sticky is chosen at random at the beginning of the computation and it does not change. Because in most graph algorithms, all partitions need to be processed at each superstep (but not necessarily all vertices in them); keeping a number of sticky partitions in memory at all times minimizes the number of swaps.

However, your partitioning algorithm may allow to split the graph in such a way that only certain partitions contain active vertices to be processed, while other partitions can be ignored for some supersteps. For those cases, Giraph can also ignore stickiness and use a least-recently-used (LRU) policy when choosing which partition to swap to disk to make space for a new one. In this setting, the partition stores operate practically as a cache. You can activate OOCG with the Giraph giraph.useOutOfCoreGraph parameter, you can define the number of partitions to keep in memory with the giraph.maxPartitionsInMemory parameter, and you can overwrite the automatic setting of sticky partitions with the giraph.stickyPartitions parameter.

The functioning of OOCG is simple. When stored on local disk, each partition is saved in two files, one for vertices and one for edges. The layout of the vertices file is, a part of a short header, a sequence of vertices serialized through the Writable interface one after the other. Similarly, the edges file is a sequence of edges, grouped by the source vertex. Spilling a partition to disk is IO-efficient, as it is a sequential write to a disk of the two files that does not involve read or seek operations, hence maximizing the use of IO disk bandwidth. In the same way, reading a partition from disk is also a sequential read of the two same files, which are parsed again through the methods defined by the Writable interface. The reason why vertices and edges are stored in two separate files is to minimize IO when possible. Many algorithms act on static graphs, as, for example, PageRank, SSSP, Connected Components, and so forth. Static graphs mean that the edges and vertices are not added or removed, and that edge values (e.g., weights) do not change. By this definition, Label Propagation Algorithm and Stochastic Gradient Descend (both described in Chapter 4) do not operate on static graphs as they mutate the edge values. Because only vertex values change during the computation, Giraph can spill to local disk both vertices and edges only the first time a partition is swapped to disk. The subsequent times, only the changing elements, the vertex values, need to be written to disk. When the partition is read back to memory, the first write of edges and the most recent write of the vertices are used. This saves substantial write IO, considering that edges are by far the largest portion of the graph. The user can specify whether its algorithm acts on a static view of the graph with the giraph.isStaticGraph parameter.

Finally, you can specify the directories on local disk used to store out-of-core partitions with the giraph.partitionsDirectory parameter. The parameter accepts a comma-separated list of directories. If multiple disks are available, it is convenient to specify one directory on each disk. OOCG spreads partitions across disks, and as compute threads swap partitions from memory to disk, they parallelize IO for faster data transfer, maximizing throughput.

Out-of-Core Messages

You have seen that Giraph stores incoming messages inside of inboxes called message stores, one for each worker. If a message store is filled with more messages than the heap can manage, the worker will run out of memory. For this reason, Giraph allows you to set a threshold to the maximum number of messages that are stored in memory, before messages start being spilled to disk. Every time the message store reaches its maximum capacity, its content is spilled to disk in a file, and a new empty message store is created to accommodate new incoming messages.

More precisely, out-of-core messages (or OOCM) works as follows. First, when OOCM is active, vertices are processed by the compute threads in sorted order, as dictated by their ID. Second, when a message store is written to disk, after the given threshold is reached, messages are stored sorted by destination vertex ID, one message after the other. As a natural effect, messages are grouped together in the file by destination vertex ID, as all messages destined to the same vertex are written together as a result of the sorting. After the messages are sorted in memory, they are written very efficiently as a sequential operation. Third, because messages arrive at different times during a superstep, messages destined to a specific vertex are spread across multiple files. These three elements allow OOCM to serve messages from disk without indices or without seek operations inside of a file. In other words, they are scanned through sequential reads.

When a superstep begins and the message store is opened for reading, the store places a cursor at the beginning of each file. When the store is asked for the messages for a specific vertex, it looks at all its cursors and check whether they are pointing to messages destined to that vertex. If a cursor is indeed pointing to such messages, it is used to read the messages (actually literally streamed as they are consumed) from the file. Note that the cursor is advanced automatically, as a side-effect of the reads, to the next group of messages. If the messages pointed by the cursor are destined to a different vertex, the file can be skipped completely for this vertex. In fact, if the file contained messages for that vertex, they would be currently pointed by the cursor. This is a necessary condition due to the fact that messages are stored sorted in the same order they are processed during a superstep. Assuming vertices (and messages) are stored in ascending order, if a cursor is not pointing to messages for the given vertex, it must be pointing to messages destined to a vertex with a bigger ID that still needs to be processed. Note that the message store does the same operations with a cursor pointing to the messages currently stored sorted in-memory. The operations are transparent to where messages are stored, as long as they are sorted. Figure 11-1 shows the layout of OOCM when reading data for vertex v4.

9781484212523_Fig11-01.jpg

Figure 11-1. Out-of-core messages reading data for vertex v4

This technique is very efficient, because it is based only on sequential writes and reads to disk, and it is hence very IO efficient. Yet, it needs to periodically sort (in-memory) portions of messages before they are written to disk, which is expensive if many messages need to be written to disk. Moreover, using a low threshold pushes Giraph to produce many small files. While reads are logically sequential within each file, the disk has to physically seek to different areas to read the content pointed by each cursor. For this reason, one should use OOCM with high thresholds, hence not in case of extremely low memory availability (in which case the more predictable OOCG should be used). OOCM is activated by means of the giraph.useOutOfCoreMessages parameter, and the threshold is set via the giraph.maxMessagesInMemory parameter. The list of directories on local disk(s) used to store messages is set via the giraph.messagesDirectory parameter. As for OOCG, also OOCM can make use of multiple disks, to parallelize disk IO.

Giraph Parameters

Giraph is a large system, and as any large enough system it has an even larger number of parameters that influence its behavior and performance. It is difficult to define how certain parameters characterize precisely the behavior and the performance of a system for all applications. To complicate things up a bit, it’s even harder to characterize how multiple parameters interact with each other. The following is a list of parameters with explanations about what they do. Some of them are straightforward; others, like numbers of threads and size of buffers, depend on your application, graph, cluster, and setting. As said at the beginning of this chapter, you’ll have to measure yourself how different values impact your applications. Through this section, you should be guided in this exploration.

  • giraph.useBigDataIOForMessages=[true|false], default false: BigDataIO is a set of classes that allows you to go beyond Java’s limitation of size of byte arrays, by wrapping together multiple ones. It can be used if certain vertices are expected to receive many messages.
  • giraph.jmap.histo.enable=[true|false], default false: By enabling this parameter, you request the JVM to print histograms of objects in a worker, possibly also to a remote controller. It is handy to see where your memory is going (as in which objects use it and should be hence optimized).
  • giraph.metrics.enable=[true|false], default false: By enabling this parameter you ask Giraph to print additional metrics information, as in where each worker is spending most time. For example, you can measure, per superstep and per worker, the time spent computing vertices, the time spending serving messages, and so forth.
  • giraph.oneToAllMsgSending=[true|false], default false: If you activate this parameter you tell Giraph that each vertex sends the same message to all its neighbors. This is the case, for example, of PageRank, but not of weighted SSSP where the message depends on each edge weight. Once Giraph knows this, it can make certain optimizations that minimize the amount of messages it stores in memory.
  • giraph.numInputThreads=integer, default 1: Input splits can be read concurrently to load the graph faster.
  • giraph.numOutputThreads=integer, default 1: Multiple threads can go through vertices and output results at the end of the computation.
  • giraph.nettyClientThreads=integer, default 4: Client threads are responsible of sending data from workers to other workers. You want to increase the number of threads when you have a large number of workers. Having a too large number of threads increases overhead, while a too small number underuses network while buffers fill-up.
  • giraph.nettyServerThreads=integer, default 16: Server threads are responsible of receiving data from other workers. You want to increase the number of threads when you have a large number of workers. Having a too large number of threads increases overhead, while a too small number underuses network while buffers fill-up.
  • giraph.serverReceiveBufferSize=integer, default 524288: You want to increase the buffer size when your application sends a lot of data; for example, many and/or large messages. This avoids that the server is constantly moving little pieces of data around.
  • giraph.clientSendBufferSize=integer, default 524288: Similar to server buffers, you want to increase the buffer size when your application sends a lot of data; for example, many and/or large messages. This avoids the server constantly moving little pieces of data around.

Some of these parameters play a joint role. In particular buffer sizes and number of threads are strongly connected. Increasing the buffer size makes sure that IO costs are amortized. Increasing the number of threads makes sure that data is not stalling in buffers. However, using large buffers for many threads increases the usage of heap with risks of going out of memory.

Summary

Giraph is a large-scale graph processing system that runs your code potentially on thousands of cores across hundreds of machines with little complexity. Its generality, although specific to graph algorithms, leaves some space to optimizations tailored to each use case. In particular, you have seen how:

  • The amount of data stored in memory and sent over the network can produce bottlenecks or hinder the ability to conclude the computation
  • While running on commodity machines, certain hardware and Hadoop cluster setup choices are more suitable to large-scale graph processing
  • Giraph uses particular memory layouts to minimize the impact of the garbage collector in the JVM
  • Depending on the access pattern to the edges, different edge stores can be more suitable
  • The user can implement tailored data structures based on primitive types for both edges and messages to decrease memory footprint
  • When necessary, Giraph can spill excessive data, graph and messages, to disk

A number of parameters can influence the behavior and performance of your applications depending on its usage of resources

________________________

1http://fastutil.di.unimi.it/

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset