CHAPTER 5

image

Working with Giraph

Previous chapters introduced the generic Giraph programming model and talked about a few common use cases that lend themselves easily to being modeled as graph-processing applications. This chapter covers practical aspects of developing Giraph applications and focuses on running Giraph on top of Hadoop.

Writing a Giraph application boils down to plugging custom graph-processing logic into the Giraph framework. This is done by providing custom implementations of the interfaces and abstract classes that are later orchestrated by the Giraph BSP machinery. In this chapter, you learn how to quickly build graph-processing applications by extending a basic abstract class BasicComputation and providing a custom implementation of the compute function. This is the absolute minimum code that needs to be written in order to have a graph-processing application, and quite a few practical Giraph applications do just that; more complex ones, however, opt to use various pieces of Giraph’s framework that this chapter reviews in detail.

You then look into what it takes to execute a Giraph application by submitting it to Hadoop as a MapReduce job. Because the Giraph framework is implemented on top of Hadoop, graph-processing algorithms don’t care whether they are running on a single node or on a cluster of tens of thousands of nodes. The process of executing a Giraph application remains the same.

The rest of this chapter focuses on a single node execution via Hadoop’s local job runner. Because the local job runner executes jobs in a single Java virtual machine (JVM), it has the advantage of being extremely easy to use with common IDEs and debugging tools (the only disadvantage being that you can’t use a full dataset—you have to run it on a small sample of your graph data). Once you get comfortable with the implementation running locally, you can either go all the way to a fully distributed Hadoop cluster or take an intermediate step of running against a pseudo-distributed Hadoop cluster. The pseudo-distributed cluster gives users a chance to experience all the bits and pieces of Hadoop machinery while still running on a single host (they are running in different JVMs communicating over the loopback networking interface). Either way, the good news is that you don’t have to change anything in your implementation—you just need to use a different set of configuration files. If you want to find out more about all the options for using Hadoop, head straight to Appendix A and then come back to this chapter.

Unlike traditional graph databases, Giraph excels at turning unstructured data into graph relationships. This chapter looks at how to achieve this flexibility via input and output formats and how you can either use built-in implementations as is or extend them to let Giraph slurp data from any source and deposit it to any kind of sink.

Finally, you see some of the more advanced use cases for the Giraph API, such as combiners for messages, master compute, and aggregators. This is quite a bit of infrastructure to review in a single chapter—buckle up for the ride!

“Hello World” in Giraph

Following the grand old tradition established by Brian Kernighan and his fellow C hackers at Bell Labs, your first Giraph application is “Hello World”: as simple an implementation as possible, which outputs a few text messages. One notable difference between the classic “Hello World” and a graph-processing “Hello World” Giraph application is the need for an input. Your Giraph application can’t function without a graph definition given to it; Giraph, after all, is a graph-processing framework.

For the rest of this chapter, you use the graph of Twitter followership that you saw in Chapter 3 (alongside the pseudo-code example of computing in-out-degrees). In this section, though, you start with much simpler code that works on the graph. All it does is print a “Hello World” message for every vertex (in this case, person) and output the neighbors the vertex is connected to (those the person is following). To make the code as simple as possible, it uses numbers instead of people’s names. The output will eventually appear as follows:

Hello world from the 5 who is following: 1 2 4

All the output that you’ll see is based on the graph Figure 5-1. It’s a Twitter followership graph illustrating who is following whom on Twitter. Look at the node for #5. The arrows pointing toward nodes 1, 2, and 4 indicate that user #5 is following those other three.

9781484212523_Fig05-01.jpg

Figure 5-1. Twitter followership directed graph

Defining the Twitter Followership Graph

The simplest textual representation of the graph in Figure 5-1 that your Giraph application can recognize is the ASCII file shown in Listing 5-1. It encodes the adjacency lists for each vertex, with each line in the input file formatted as follows: vertex neighbor1 neighbor2....

If you’re familiar with how input data is provided to Hadoop MapReduce jobs, you see that Giraph follows the same approach: Giraph applications process all the files in a given subdirectory (in this case, src/main/resources/1) and consider them to be part of the same graph definition. You could put every line (defining a given vertex’s topology) into a separate file or group them in a number of files and achieve the same result. For example, the input shown in Listings 5-2 and 5-3 that consists of two files is considered identical to the input in Listing 5-1.

Creating Your First Graph Application

With the input data in place, let’s now turn to the Java implementation of your first “Hello World” Giraph application (GiraphHelloWorld.java, shown in Listing 5-4). Remember that you are trying to print as many lines as there are vertices in a graph, with each line reading “Hello world from the X who is following: Y Z...”.

Even in this trivial example, there’s quite a bit going on. You start by extending a basic abstract class for performing computations in Giraph’s implementation of the BSP model: BasicComputation. Immediately you have to specify the four type parameters that tell Giraph how to model the graph. Recall that the Giraph framework models every graph via a distributed data structure that is parameterized by the type of each vertex (VertexID), the type of label associated with each vertex (VertexData), the type of label associated with each edge (EdgeData), and the type of messages the BSP framework will communicate over the network (MessageData). Even in this simple example, you have to make choices about all four of those based on the representation of your input graph (integer IDs for vertices with no edge labels). Thus the type parameters you need to pick are IntWritable for VertexID and VertexData and NullWritable for EdgeData and MessageData. Once again, the choices are predetermined by the simple input format for the graph data you’ve settled on; they must also be compatible with an actual implementation of the vertex input format given to the Giraph command-line executor when a computation job is submitted to Hadoop (shown later in Listing 5-8). Finally, these choices mean the rest of the GiraphHelloWorld implementation expects to have access to the topology of the graph, with every vertex having an Integer ID and a list of edges connecting to other Integer IDs; but it doesn’t expect any labels to be associated with the edges and, most important, doesn’t send messages between the vertices of the graph (NullWritable signals the lack of data).

THE MIGHTY FOUR OF GIRAPH PROGRAMMING

Throughout Giraph, you see the following quadruple of type variables used for various generic types:

  • VertexID is a type that can be used for referencing each vertex.
  • VertexData is a type of a label attached to each vertex.
  • EdgeData is a type of a label attached to each edge.
  • MessageData is a type of message that could be sent between two vertices.

All four types are required to extend a basic building block of the Hadoop serialization framework called a Writable interface. On top of that, because objects representing vertices have to be partitioned between different nodes in the cluster, VertexID has a stricter guarantee of extending a WritableComparable interface. Reusing Hadoop’s approach to object serialization means Giraph can use a lot of common code. Also, given that message passing between vertices residing on different nodes in the cluster is one of the key features of the BSP framework, the types you use must be extremely efficient when it comes to serialization and deserialization. In other words, they need to be optimized for network traffic. Fortunately, Hadoop has solved the same problem for its MapReduce framework by creating its own approach to serialization and deserialization built around the Writable interface. In any Giraph application, you can use a wide collection of Hadoop-provided classes that implement the Writable interface and map to all the usual data types a Java application might need (boolean, int, Array, Map, and so on).

Giraph expects you to implement all the computation in a method called compute() that is called for each vertex (at least once initially and then for as long as there are messages for it). Because you are extending an abstract class, you must provide a definition for compute(). This example prints a message with a vertex ID followed by a list of all the IDs of this vertex’s neighbors. The neighbors are determined by iterating over all the edges and looking up an ID of a vertex that happens to be a target of each edge. Finally, you call voteToHalt(), thus signaling the end of the computation for a given vertex. When all vertices call this method, that signals the end of the current iteration of the BSP computation (and, unless there are messages left, the end of the entire Giraph application run).

Finally, in case you are wondering why a main() method is defined, it is not required but is convenient for cases in which you want to manually execute the example without having to call the giraph command-line executor. One such example is given in Chapter 12, when you run Giraph applications on Amazon’s cloud.

With the Java implementation of GiraphHelloWorld in place, the only other bit of housekeeping is to create a project (either Maven or Ant) that helps with pulling in the right dependencies and compiling the example. If you decide to go with Maven, Listing 5-5 shows how your Maven pom.xml file for the project should look. Note that the only required dependencies are giraph-core and hadoop-core.

At this point, make sure pom.xml is at the top of the source tree and the Java implementation under src/main/java. This will let you easily build the project by issuing the command shown in Listing 5-6.

Once Maven is done with the build, you should see the resulting jar file appear under the target subdirectory. The next section explains how to use this jar to execute the GiraphHelloWorld application.

Launching Your Application

The only remaining two things you need in order to run the example are standalone installations of Hadoop and Giraph (Appendix A has detailed instructions for installing both). Once you have those in place, update your shell environment. If bash is your shell of choice, Listing 5-7 will do the job (if you are using a different shell, make sure to consult its documentation regarding how to export environment variables).

As long as you keep the previous environment part of your shell session, you can run the example Giraph application using Hadoop’s local execution mode, as shown in Listing 5-8. Note that executing Giraph applications results in a fairly long single command line. To fit everything on the book page, the single command is broken with backslash () characters. When typing this command line into your terminal window, you can either do the same (make sure the backslash is the last character on each line) or type it as a single line.

Congratulations! You’ve developed and executed your first Giraph application. Granted, it is pretty simple, and the output could be made better by printing the names of people instead of their numeric IDs (the next section deals with this), but this is a self-contained Giraph application in just under a couple of dozen lines of code. By the way, don’t be alarmed if you see the output lines in a slightly different order than they appear in Listing 5-8: the order of vertex processing in Giraph is non-deterministic, although, of course, all vertices are guaranteed to be processed eventually.

Let’s take one more look at Listing 5-8 and go through all the components of the command line. The first argument (target/*.jar) specifies the location of the jar file containing the implementation of your graph-processing “Hello World” application. The second argument (GiraphHelloWorld) specifies the name of the class with the compute method to be applied to all the vertices in the graph. The third argument (-vip) specifies the input path to the graph data. The fourth argument (-vif org.apache.giraph.io.formats.IntIntNullTextInputFormat) tells the Giraph implementation what input format to use for parsing the Graph data. The fifth argument (-w 1) makes Giraph use only one worker to process all vertices in the example graph (because you are not running on a cluster, you can only use one worker). Finally, the last command-line argument (-ca giraph.SplitMasterWorker=false,giraph.logLevel=error) tweaks a few internal knobs of the Giraph implementation to allow execution with a single worker and reduce the verbosity level to errors only.

Although it is useful to configure Giraph application execution on the fly using the –ca command-line option, a more practical approach is to put any reusable configuration into an XML configuration file. Anybody familiar with the Hadoop Configuration API will find the Giraph configuration very similar (internally, Giraph simply reuses Hadoop’s implementation of the Configuration API). For example, to create a permanent Giraph configuration for the local Hadoop execution environment, you can put the configuration shown in Listing 5-9 into the giraph-site.xml configuration file under $GIRAPH_HOME/conf folder (see Appendix A for more information on how to install and configure Giraph).

With this configuration in place, you no longer have to supply the last line of the command-line options when executing Giraph applications.

Image Note  Keep in mind that everything you put in the static configuration file applies to all your giraph runs, so it’s important to keep track of the properties defined there.

Using a More Natural Definition of a Twitter Followership Graph

As mentioned before, one tiny problem with the example application is that the messages it outputs are not particularly descriptive. Wouldn’t it better if instead of numerical identifiers, you had the original names of the people, as shown next to each vertex in Figure 5-2?

9781484212523_Fig05-02.jpg

Figure 5-2. Twitter followership directed graph

Even the definition of the graph itself looks more natural specified as shown in Listing 5-10 (the definition uses dummy 0s as the values for the vertex and edge labels because you are not interested in them for now).

The definition of the graph looks more complete now, but the format still has a few annoying restrictions. Everything must be delimited by a single tab ( ) character (whitespaces won’t work), and dummy 0s designating vertex and label edge values need to be present even though they are not used. A more flexible format would be nice; but believe it or not, Giraph doesn’t support such a format out of the box. You have to wait until Chapter 8 to see how you can extend the built-in implementation of the I/O formats to tweak them to your own liking. For now, the annoying 0s will stay because TextDoubleDoubleAdjacencyListVertexInputFormat expects this format.

Although it is natural to assume that at this point you can simply feed this new graph definition to your existing Giraph application, the fact that you changed the vertex representation from integers to strings requires you to make a change to the type variables used in defining the compute() method and its enclosing class. The changes to GiraphHelloWorld are trivial; the new implementation is shown in Listing 5-11. Note that you use Hadoop’s Text type for string representations. This is consistent with the earlier advice to use Hadoop’s types to manage data in an I/O friendly way.

Once you rebuild the jar using the previous source code, don’t forget to also specify the new InputFormat on the command line, as shown in Listing 5-12.

As you can see, switching between input formats was pretty easy. All you had to do was update the type variables in the Java implementation and specify a different input format (org.apache.giraph.io.formats.TextDoubleDoubleAdjacencyListVertexInputFormat) in a fourth argument on the command line. The rest of the command line is exactly the same as Listing 5-8.

Now that you have seen the basics of Giraph applications, let’s move on to how more complex graph-processing algorithms are expected to use the Giraph APIs.

Counting the Number of Twitter Connections

At this point, you should be comfortable with the basics of the Giraph framework. It is time to kick it up a notch and implement a few more applications. In this section, you turn pseudo-code examples from Chapter 3 into actual Java code that can be executed with Giraph the same way you executed the “Hello World” Giraph application. The first example, counting followership on Twitter, is shown in Listing 5-13.

An overall idea for the previous algorithm is to use empty messages as a way of notifying vertices receiving them that there’s a connection (recall that Giraph doesn’t store the set of all the vertices that are connected to a give vertex). You send messages during the first superstep of the BSP computation (getSuperstep() returning 0). Given that there are messages to be processed, the compute() method is called again for the next superstep. You process those messages (sent in the previous superstep) by counting them and assigning the sum to each vertex’s label. This is how you arrive at a value of indegree for each vertex.

A natural question to ask after reading the code is, how in the world are you supposed to find out the final indegrees of each vertex if the code doesn’t produce any output? The answer lies in the Giraph facility that is complementary to that of the input format: an output format. An output format is just that: a way for Giraph to dump the topology of a graph and all the labels once it is finished running. Using output formats is as simple as specifying two command-line options: -vof to request a particular implementation and –op to specify an output subdirectory. Listing 5-14 shows the command line required to specify an output format that is exactly the same as the input format used in Listing 5-12. Note that the first thing you do is removing the output subdirectory; this is because Giraph expects that the requested subdirectory does not exist. Also note that the last command outputs the content of all the files into which Giraph’s output format could have split the resulting graph. For big graphs, this could be quite a few individual files named part-XXX. Finally, although running Hadoop in local mode results in output in the local filesystem, in general the last command would be based on calling Hadoop’s filesystem utilities, as you see in the next section.

Let’s recap what you have learned so far. Every Giraph application starts with reading the graph definition from the set of input files. This data describes the initial Graph topology and consists of Vertex objects connected by Edge objects. The Giraph application runs in a distributed fashion, iterating over the set of Vertex objects and calling a compute() method provided in a user-defined computation class. Implementing the Computation interface is the only requirement for the custom computation class. In practice, though, most applications subclass one of the two abstract classes: AbstractComputation or BasicComputation. The latter is a slightly more restricted form of the former. Compute() methods for all the required vertices are executed in series of supersteps. A superstep finishes when all the vertices call voteToHalt(). The supersteps are numbered starting from 0, and the number of the current superstep can always be requested by calling getSuperstep(). The “Hello World” example consisted of exactly one superstep: superstep 0. The current example runs in two supersteps; most real-world Giraph applications iterate through a variable number of supersteps as part of the convergence of the algorithm. Finally, compute() can elect to call voteToHalt() on its Vertex object, signaling that, unless new messages are delivered to it in the next superstep, it wishes not to be called again. The entire Giraph application finishes when all of its vertices are halted and there is not a single pending message. At this point, if a user requests that the entire state of the graph be flushed to storage, an output format takes over and makes sure the data is serialized in files under the desired location.

Keeping input and output formats compatible makes it possible to chain Giraph applications in arbitrary pipelines, each operating on the output of the previous one(s). Giraph algorithms are close to the Unix tooling philosophy where one tool is meant to “do one thing and do it well.” The expressiveness comes not from the complexity of each tool or algorithm but rather from the flexibility of building arbitrary pipelines from multiple stages, all operating on common datasets.

With this theoretical background, let’s turn to the second example from Chapter 3: converting directed graphs to undirected graphs.

Turning Twitter into Facebook

From a graph theory perspective, the biggest difference between Facebook and Twitter is directionality. The Twitter social graph is directional (if a user follows another one it doesn’t mean the reverse is true), and the Facebook graph is not (your friends consider you to be a friend). So far, in the examples, you’ve been using a Twitter followership graph. This has the additional benefit of mapping one to one on Giraph’s graph model of directional graphs. If you want to model a Facebook graph with Giraph, you have to make sure that vertexes are connected in both directions.

Let’s use Giraph to turn your existing Twitter connection graph into a Facebook one by running the code shown in Listing 5-15.

At this point, you could run the code exactly the same way you ran your previous examples. That, however, would produce Giraph-friendly, but not necessarily human-friendly, graph output. Instead, you can use a different output format that can be used as an input to a variety of graph visualization tools. A picture, after all, is worth a thousands words. Listing 5-16 executes Giraph almost identically to the previous example, aside from using a different output format (org.apache.giraph.io.formats.GraphvizOutputFormat).

To visualize the output generated from running this command, you first ask Hadoop to merge different sections of the output into a single file. Then you add a header and a footer required by the Graphviz format, and finally you run a Graphviz visualization command-line utility to generate a file that can be viewed on screen. These four commands are shown in Listing 5-17, and Figure 5-3 shows the resulting picture.

9781484212523_Fig05-03.jpg

Figure 5-3. The Facebook equivalent of the Twitter followership graph

GRAPHVIZ (DOT): A LINGUA FRANCA FOR GRAPH VISUALIZATION

Graphviz is open source graph-visualization software. Graphviz and its input language, DOT, were developed at AT&T Labs Research for quickly visualizing arbitrary graph data and have taken on a life of their own. A reference implementation is available on most Unix platforms, but there are also a number of cleanroom visualizer implementations for the DOT format. If the DOT visualization utility (circo) is not available on your platform, you can try visualizing the Facebook graph by pasting the content of the giraph.dot file into one of the online Graphviz tools, such as the one at www.webgraphviz.com.

The graph visualization lets you see the vertex value associated with each vertex and the edge value associated with each edge. Because you are not using vertex values in the example, in Figure 5-3 all of them appear as 0.0 in a box next to the person’s name. Edges, on the other hand, are labeled according to whether the edge was added as part of your conversion (2) or whether it was always present in the graph (1).

This was the first example of mutating a graph’s topology by adding new edges. The next section covers additional ways to change the graph structure on the fly.

Changing the Graph Structure

Adding new edges to a graph was easy enough, but what about adding new vertices? What about building entire graph segments directly in memory without reading the data from external sources? An extreme example of that would be generating a graph from scratch based on certain criteria. If you think of Giraph jobs forming a pipeline, generating a graph could be the first step in that pipeline. This is a typical approach for simulation and system testing of complex graph algorithms. The next example, shown in Listing 5-18, generates your old friend the Twitter followership graph starting from nothing but an empty graph.

When you run this example, a very logical assumption would be that because the entire point of this Giraph application is to generate the graph from scratch, you shouldn’t give Giraph any input files. A more advanced use of Giraph would allow you to do that, but for this simple example you face a small challenge. The code that generates a graph is part of the compute() method, and because compute() methods are called for vertices of an input graph, the easiest way to do what you want is to give Giraph the smallest graph possible: a graph with a single vertex and no edges, as shown in Listing 5-19.

With this input in place, you can run Giraph exactly as you did for all the previous examples. Let’s run it with the Graphviz output format, as shown in Listing 5-20, so you can quickly check the visualized graph and make sure the code generated the original graph from Figure 5-2.

Two of the Giraph API calls used in this example to modify the topology of the graph (addVertexRequest and addEdgeRequest) are part of a broader API set known as graph mutation APIs. Most of these API calls modify the overall graph structure in one way or another. The changes can be available to all members of the computational process either immediately or in the next superstep (that’s why in the example you can’t exit immediately but must wait for the next superstep). Here’s a summary of the most popular mutation API methods used in real-world applications:

  • setValue(): Modifies its own value. The result of this action is immediately visible (in the same superstep) to the entire computation.
  • setEdgeValue(): Modifies the edge value of the outgoing edges. Even though the result is also immediately visible to the entire computation, the application of this method is limited because typically only a source vertex has access to the outgoing edges.
  • addEdge(), removeEdges(): Modifies the local topology of the graph by adding or removing edges, with the source being the same vertex.
  • addVertexRequest(), removeVertexRequest(), addEdgeRequest(), removeEdgesRequest(): Asks to modify the global topology of the graph starting from the next superstep.

By using these methods, you can create graphs with a very different structure and topology compared to the graph available to Giraph at the start of execution. Interestingly enough, in addition to the straightforward methods of mutating a graph presented here, a less obvious one has to do with that jack of all trades in Giraph: messages. This is the subject of the next section.

Sending and Combining Multiple Messages

As mentioned, the Giraph messaging topology is completely independent of graph topology. In other words, every vertex can send a message to every other vertex. Sending messages to a nonexistent vertex creates the vertex. (See? Messages can also be considered part of the graph mutation APIs!)

This is the trick the next example exploits to build a graph topology in parallel. Instead of a single loop running on a single host and iterating over all the vertices and their edges, you use a two-stage approach. First you create all the vertices by sending messages to them. Each message contains one neighbor of that newly created vertex. Once the vertices are created and assigned to the worker nodes in the cluster, each vertex, in parallel, begins iterating over the received messages and creating edges connecting it to its neighbors. Listing 5-21 is a modification of the previous example that does just that.

Of course, for a tiny graph like this one, the difference between sequential and parallel versions of the same code is negligible. But it does highlight an important design point that every Giraph application needs to consider: parallel graph processing is always limited by the amount of sequential code that cannot be parallelized. This is also known as Amdahl’s law of parallelism.

The latest example makes it possible for edges in the graph to be created in parallel as opposed to being created in a single loop running at superstep 0. This is a good thing, but there’s still one more optimization possible.

Consider the fact that for each edge, you have to create a dedicated message. Given that all you need to communicate in that message is an integer ID, the overhead can be pretty high. It would be much more efficient if you could somehow aggregate or combine neighbor IDs into a single message. One way of accomplishing this is to change the data type of the messages being communicated into a byte array. Another way is to use Giraph’s combiners.

You implement a combiner for messages sent between vertices whenever there’s a natural way to aggregate multiple messages into a single one. Giraph provides a few useful combiners to get you started. SimpleSumMessageCombiner, for example, sums individual messages into one; and MinimumIntMessageCombiner finds a minimum value in messages containing individual integers and creates a single message containing that value. The next example, however, creates a custom combiner that combines all of your messages in a bitmap array. Each integer value sent in a message is represented as a bit set in a bitmap. For example, if a message contains the integer 3, the third bit from the right is set to 1. Listing 5-22 shows the code for BitArrayCombiner.

In general, combiners tend to be dead simple: all you have to do is to define the function that returns an initial message. This message is used as an accumulator for the messages emitted as part of compute()execution. The job of accumulating the data is performed by the code defined in the combine() method. As you can see, this method treats originalMessage as a mutable accumulator; in this case it sets a bit corresponding to the neighbor integer ID in an overall integer bit array.

Combiners can be useful, but they are not transparent. First, you have to tell Giraph that it needs to use a combiner. This is typically done by passing a class name to the –c command-line option, just as you pass an input format. Second, you must modify the code from the previous example so it can unpack the bit array that is now the payload of each message. This issue, plus the fact that you can only work with fewer than 32 vertices (because integers in Java are 32-bit) makes BitArrayCombiner a bit impractical and only good as an illustration of how easy it is to write a combiner.

Speaking of limitations, if you tried running the previous example, you probably noticed a subtle bug in its implementation: it’s impossible to generate the vertices with zero neighbors. The usual way to reduce the likelihood of this type of bug is to develop code hand-in-hand with unit tests verifying its correctness. You have been developing code in this chapter without any kind of testing or verification, and it is time to change that.

Unit-Testing Your Giraph Application

Developing complex applications against any framework typically requires a suite of unit tests guaranteeing against accidental changes in semantics during code changes and refactoring. Fortunately, the event-driven nature of Giraph applications makes it extremely easy to test the semantics of various methods (such as compute()) in isolation. You can also create a miniature execution environment for full-fledged application testing.

As mentioned earlier, the previous example had a subtle issue. With that in mind, let’s create a unit test verifying that the resulting graph topology is indeed what you expect it to be; see Listing 5-23.

Even though this implementation is only a few lines long, a number of remarkable things happen behind the scenes courtesy of the InternalVertexRunner implementation. The run() method used for this example requires that you supply Giraph’s configuration and a String array simulating the content of the input. What you get from calling run() is a String array simulating the output that would be written to a filesystem during a normal run of the application. Although the input and output files are simulated, everything else is not. The conf object is constructed exactly as it would be during the actual run of the application by combining default settings with settings defined in giraph-site.xml and those specified as part of the giraph command line. If you don’t want to be constrained by the syntax of the command line, you can write a main() method to explicitly set configuration values.

Running the unit test is a simple matter of either hooking it up to your build system (Ant, Maven, or Gradle) or passing it to the JUnit executor. If you are using Maven to build the examples, all you need to do is save the test case under src/test/java/TestGiraphApp.java in your project file tree and add JUnit dependency to the pom.xml file as shown in Listing 5-24. After that, executing mvn test will run the unit tests for you (and so will the mvn package command).

Either way, the assertion fails because the resulting graph is one vertex short of what you expect. Making the unit tests pass is left as an exercise for you; the next section covers the somewhat orthogonal subject of extensions to the BSP compute model that Giraph provides.

Beyond a Single Vertex View: Enabling Global Computations

So far, the Giraph applications you’ve seen have assumed no shared state between different vertices. Sending messages works well for communicating data between two given vertices (or even between a given vertex and a set of message recipients), but it doesn’t really work if the entire computation needs to keep a running tally that can be updated and accessed by all compute() methods working on all the vertices in a graph.

For example, imagine a situation where you need to run a parallel search on a graph. Let’s say your graph represents a map, with vertices being cities, edges being roads, and edge labels being distances between cities. Ideally you could find an absolute shortest path between the two vertices; but more often than not, a close approximation of a shortest path will do. The question then becomes, how do you keep track of the current shortest path distance, and how do you abort the computation when the path gets to be small enough (but perhaps not as small as the absolute shortest path)?

Giraph offers an efficient way to solve this: named aggregators. The next two sections talk about aggregators and how they work with master compute.

Using Aggregators

As mentioned earlier, aggregators really shine when you need to track graph statistics that don’t naturally belong to any of the vertices (or even a subset of vertices). One such statistic is the total number of connections (edges) in the graph. Listing 5-25 offers a very simple way to count the number of edges in the graph by using an aggregator. That aggregator happens to be a summing aggregator, but you don’t know that unless you look at Listing 5-26 and see that the aggregator implementation is registered under the name TotalNumberOfEdgesMC.ID.

The idea here is simple: every compute() method pushes the number of outgoing edges associated with a given vertex into an aggregator registered under the name TotalNumberOfEdgesMC.ID. As you can see, the compute() methods don’t care what happens to the values sent to the aggregator, nor do they care what kind of aggregator they are sending values to. Setting up an aggregator is, by itself, an example of a global action that must be done once before the first superstep, so you cannot do that in a vertex-centric compute() method. You have to use a bit of code that runs once before each superstep, and that is what the master compute implementation shown in Listing 5-26 allows you to do.

Master compute allows you to execute global actions using the entire graph topology and all the aggregator values available to you. Because the master compute initialize() method runs once very early during the initialization of Giraph, this is an ideal place to register all the aggregators you use in a given application. Once the aggregators are registered, vertex-centric compute() methods can send values to them, and the master compute() method can inspect the state of the aggregators before each superstep. In this case, you are using the compute() method of the master compute implementation to print out the current value of the summing aggregator. In real-world applications, you can use it to do any kind of global computations, based on the results of which you can even terminate your application early (provided that a globally computed heuristic of your choice has been satisfied).

Executing the example in Hadoop local mode provides the expected output, but for cluster execution you are better off recording the state of the aggregators to external storage at given intervals during execution of your application. Think of it like having the graph’s final state recorded at the end of the run. Of course, you can replace the System.out.println() call in master compute, but there’s a better way. Giraph provides the notion of an aggregator writer. An aggregator writer class is expected to implement an AggregatorWriter interface and provide an implementation for the writeAggregator() method that, at the end of each superstep, is passed a map of all the registered aggregator names, their values, and the numeric ID of a superstep (the end of a computation is signaled by passing the ID LAST_SUPERSTEP). As usual, you’re free to implement your own aggregator writer or use the default TextAggregatorWriter. For now, let’s stick with the default.

Putting it all together, Listing 5-27 shows how you can run your edge-counting Giraph application from the command line.

There are a couple of interesting points to note about the giraph command line presented in Listing 5-27. First, because you are using master compute, you tell Giraph which class contains that implementation as a third argument (-mc TotalNumberOfEdgesMC). Second, you tell Giraph to use an aggregator writer that outputs the values of the aggregators into the default Hadoop filesystem (-aw org.apache.giraph.aggregators.TextAggregatorWriter). Finally, you specify that you want the aggregator values to be recorded at every superstep (-ca giraph.textAggregatorWriter.frequency=1). Note that you specify the frequency by providing a property that the TextAggregatorWriter implementation recognizes. You can use the same technique for configuration properties in your own implementations.

The last command in Listing 5-27 demonstrates that TextAggregatorWriter did, indeed, record the state of your aggregator at each superstep. Those values are the same as the ones produced by your System.out.println statements.

The next section provides an additional level of detail about how the aggregator and master compute machinery works. But first, let’s take a moment to look at the implementation of LongSumAggregator, shown in Listing 5-28. Even though this aggregator is available to you out of the box, it is useful to know how simple it is to create specialized aggregators of your own.

The implementation here is as simple as it gets, but it is not a toy example. This code was lifted verbatim from the Giraph code base. Hopefully this provides enough inspiration for you to create your own aggregators for cases when Giraph doesn’t come with the ones you need.

Aggregators and Master Compute

As you have seen, aggregators are referenced via simple flat namespace of String names. Each name is associated with a class implementing the Aggregator interface. Once this association is established, the compute() method of every vertex can provide a value to be aggregated via the call to the aggregate(name, value) method or query the current state of the aggregator by calling getAggregatedValue(name) and expecting the value of the same type as the one supplied in corresponding calls to the aggregate() method. The aggregator implementation defines how multiple values supplied by calls to aggregate() coming from different vertices are aggregated into a single value of the same type. This is similar to the pattern you saw with combiners.

For example, an aggregator may sum numeric values or find a min/max value. Giraph provides about a dozen aggregators, and users can add to the collection by implementing the Aggregator interface directly or, better yet, subclassing a BasicAggregator abstract class. The latter provides a reasonable starting point for the most practical aggregators, whereas the former leaves the implementation unconstrained. There can be as many aggregators as you require: they are all independent in terms of data type and the aggregation performed on multiple values coming from different vertices.

Finally, aggregators can be regular or persistent. The difference is that the value of a regular aggregator is reset to the initial value in each superstep, whereas the value of a persistent aggregator exists throughout the application run. You can use the same aggregator implementation as either regular or persistent, depending on whether you register it by calling registerAggregator() or registerPersistentAggregator().

The functionality of aggregators serving as rendezvous points between vertices is useful, but their real power for affecting graph computation comes from combining them with master compute. The idea behind master compute is a slight extension of a straight BSP model. All you are doing is introducing a hook to run a special compute() method of a master compute object at the beginning of every superstep (before invoking compute() for individual vertices). This gives you a centralized location to affect every aspect of the rest of the graph computation. Master compute provides an ideal opportunity to register and initialize aggregators and also to inspect their state between consecutive supersteps. Aggregators, in turn, provide a means of communication between the workers and the master compute methods. The values of all the aggregators on all the worker nodes are always consistent in a given superstep, because their values are broadcast at the beginning of each superstep. On the flip side, at the end of each superstep, the values are gathered and made available to the next invocation of master compute. This means aggregator values used by workers are consistent with aggregator values from the master from the same superstep, and aggregators used by the master are consistent with aggregator values from the workers from the previous superstep.

A Real-World Example: Shortest Path Finder

So far, you have been focusing on artificially small examples that highlight certain aspects of Giraph’s APIs. You may think that a real-world Giraph application is much more complex and involved. The truth, however, is that more often than not, even applications used in very large-scale graph-processing jobs tend to be pretty compact.

In general, Giraph applications follow a Unix philosophy of small tools that do one job and do it well. In Unix, it is common to use pipelines with the next tool operating on the output of the previous one, and so it is with Giraph. Giraph pipelines are strung together with the next Giraph application operating on the Graph representation serialized by the previous one.

Out of the box, Giraph comes with a few well-known, useful algorithms. You can get a list of built-in algorithms by giving Giraph the command-line option -la.

The example shown in Listing 5-29 is a complete implementation of one of the quintessential algorithms in graph theory: it finds the shortest paths in a graph (also known as Dijkstra’s algorithm). Here’s what it does: given a graph representation and a vertex, it finds the shortest path between that vertex and every other one in the graph. For example, if you think of your graph as a map with vertices representing cities, edges representing roads, and integer edge labels representing driving distances, then the results are the shortest driving distances between a chosen city and the rest of the cities on the map.

The idea behind this algorithm is fairly straightforward. You start by assigning a driving distance of 0 to a chosen city (after all, you don’t have to drive to get there) and a distance of infinity to all the others (the worst-case scenario is that there is no path between all the other cities and your starting city). You then proceed, in parallel, to calculate for every vertex a sum of vertex’s value (the current shortest distance to it) plus the distance to all the vertex’s neighbors. You can think of it as each vertex broadcasting its best-known shortest past to all of its neighbors and the neighbors having a chance to update their values based on that information. If such an update offers a shorter distance, the neighbor would be foolish not to consider it the new shortest path. The algorithm converges when no more updates happen on the graph.

Listing 5-29 is the unabridged source lifted directly from the Giraph code base. It is a good example of how even the smallest, simplest graph-processing algorithms can be indispensable in practice. It also gives you an opportunity to review quite a few API calls that you learned about in this chapter.

This code operates on a graph with every vertex having a LongWritable ID and a DoubleWritable distance to the source vertex as its vertex data. All the edges have FloatWritable labels associated with them, representing the distance between two connected vertices. A source vertex is given to the implementation via the SimpleShortestPathsVertex.sourceId command-line option; it is expected to be a LongWritable ID. You measure the distance from this source vertex to all the other vertices in a graph. Initially you assign a distance metric of Double.MAX_VALUE to all the vertices except the starting one. Each vertex expects to receive messages from the neighbors it is connected to, announcing the distance from the source vertex to each neighbor plus the distance between a neighbor vertex and a vertex that is receiving. You then pick the smallest value (because you are interested in the shortest paths), and if it happens to be smaller than the current distance, you assume that a shorter path through one of the neighbors was uncovered; you then send messages to the neighbors announcing that fact. Finally, an unconditional call to voteToHalt() guarantees that the computation continues as long as there are unprocessed messages. This makes sense, because when there are no messages left, it means every vertex has the shortest distance and all that is left is to record the state of the graph.

Summary

This chapter covered these topics:

  • Building your first “Hello World” Giraph application and writing a unit test for it
  • Discovering various ways of specifying graph-definition data
  • Exploring the details of the Giraph graph computation model
  • Manipulating the topology of the graph structure
  • Looking into advanced uses of messaging
  • Taking advantage of Giraph extensions to the simple BSP compute model such as aggregators and master compute

The Giraph framework offers a set of powerful APIs catering to variety of graph-processing algorithms. One thing that all Giraph implementations have in common is that they must provide an implementation of the compute() method that is called for every vertex at least once. Each implementation of compute() has full access to a variety of utility methods:

  • Every Giraph application uses voteToHalt() when it needs to signal that a vertex is done computing. When all vertices call voteToHalt() and there are no more pending messages, the entire computation stops.
  • You can send messages from any vertex to any other vertex (regardless of whether they are connected) by using methods like sendMessage() and sendMessageToAllEdges(). If the destination vertex doesn’t exist, it is created.
  • On the I/O side of things, you have access to a wide array of built-in input and output formats for parsing various graph representations and an option to roll your own if needed. You saw IntIntNullTextInputFormat and TextDoubleDoubleAdjacencyListVertexInputFormat in this chapter, but many more come with Giraph and allow you to slurp data not just from files but also from HBase, Hive, and other data stores.
  • Another commonly used set of APIs provides ways to query graph topology (getEdgeValue(), getVertexId(), and so on) and also to modify the graph during the application run (addVertexRequest(), addEdge(), and so on).
  • A subset of APIs (supporting aggregators and master compute) goes beyond the basic BSP graph computation model.
  • If the cost of sending a message ever becomes a concern, Giraph offers efficient tools to deal with it by applying combiners to multiple messages and effectively compressing the traffic.

This chapter laid the foundation for the rest of the book in terms of practical usage of Giraph APIs, but you still haven’t seen the implementation details of the Giraph framework and how it uses lower-level mechanisms such as Hadoop MapReduce and Apache ZooKeeper. This is the focus of the next chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset