CHAPTER 3

image

The Giraph Programming Model

This chapter covers

  • Giraph design goals for graph processing
  • The vertex-centric API
  • Using combiners to minimize communication
  • Aggregations through aggregators
  • The bulk synchronous parallel model

This chapter digs into the nature of graphs and graph algorithms, and how graph algorithms can be implemented and computed with Giraph. You learn how graph problems are inherently recursive and why graph algorithms therefore are usually solved iteratively. You see how Giraph is designed for iterative graph computations and explore the vertex-centric programming API and paradigm of Giraph. You then look at examples of simple algorithms to get acquainted with the model. The chapter concludes by opening the hood of Giraph to examine the underlying distributed engine that makes iterative computations so fast and simple.

Simplifying Large-Scale Graph Processing

Traditionally, graph algorithms have been designed following the model of sequential programming we are all accustomed to. The graph is represented in main memory with native data structures such as a matrix or lists. The algorithms assume a global view of the graph and a single thread of execution. Both the data structures and the execution logic are tailored to the solution. This approach has a number of drawbacks. First, a new graph problem brings a new graph algorithm, and probably with it a new approach and model redesigned from scratch. A tailored, ad hoc solution allows for fine-grained optimizations, but it requires practitioners to partially reinvent the wheel every time they implement a new algorithm in a system. Second, algorithms need to be specifically designed or modified to run in a parallel, distributed system. Again, this allows for fine-grained optimizations that exploit a particular platform, but it requires reinventing the wheel every time a new algorithm is implemented for a parallel, distributed system. Plus, it is nontrivial to parallelize a graph algorithm, because graph computations are unstructured and interleaved.

Giraph tackles both problems by providing a programming model that has been designed with graph algorithms in mind and that at the same time hides the complexity of programming a parallel, distributed system. Both characteristics minimize the effort of implementing a graph algorithm that works at large scale. The following two sections cover these characteristics.

Hiding the Complexity of Parallel, Distributed Computing

Giraph offers you much more than a library for executing graph computations; it offers a programming model and an API that let you focus on the semantics of the specific algorithm you are designing without having to worry about how the graph is stored in memory, the way the algorithm is executed, or how the computation is distributed across machines with tolerance to faults. Basically, the way the graph is represented in memory and the execution engine is general to any graph algorithm that can be expressed with Giraph. Practically, you have to write a user-defined function (UDF) that is executed iteratively on each vertex by the Giraph runtime across the processing units. This UDF is agnostic to the way data is shared across these units and the way code is executed concurrently. In other words, no locking or concurrent coordination is required on your side. As far as you know, your UDF is executed sequentially on a single unit. Figure 3-1 shows this conceptual stacked organization.

9781484212523_Fig03-01.jpg

Figure 3-1. Conceptual organization of an application in Giraph

Image Important  According to the Giraph programming model, you have to develop a UDF that is executed iteratively on the vertices in the graph. You are agnostic of the way the graph is represented in memory, the way the function is executed in parallel across the distributed system, and how fault-tolerance is guaranteed.

The UDF defines how each vertex manages the messages it receives to update its value, and what messages it sends to what other vertices. Because vertices share data through messages, no locking is required. Also, because each vertex is executed at most once during each iteration, there is no need for explicit synchronization by the user. This means you have to focus only on how to express an algorithm from the perspective of being a vertex that can exchange messages with other vertices in a number of iterations. This is why the programming model is usually referred to as a vertex-centric paradigm. Although it is more restrictive, this model guides you toward developing algorithms that can reach massive parallelization. Chapter 4 develops this idea further.

Programming through a Graph-Specific Model Based on Iterations

The previous chapter presented graphs and how they can be used to model data in different domains. Now, let’s have a quick look again at graphs, focusing on how they shape the programming model used to express graph algorithms. Graphs are characterized by a few concepts, making them very simple to understand. You have “just” a set of vertices with a set of edges connecting them, potentially with direction, label, and weight. There is nothing more to it. With a combination of these concepts, you can model pretty much anything you can think of. But simplicity comes at a price. The problem is that the information about each vertex is contained not only in its adjacent vertices and in the labels and weights assigned to the edges that connect them, but also in vertices that are farther away. Information about each vertex is often distributed all over the graph, making graphs complex and expensive to manage.

Let’s look at an example of how you can gain more information about a vertex by looking further than its direct neighbors. In Figure 3-2, the leftmost circle represents the simplest graph of all (except, perhaps, an empty one). It consists of a single vertex. What does that graph tell you? As is, not much. But if you add a label to the vertex and connect it to a bunch of other vertices with labeled edges, you get a better picture. You realize that the graph contains information about Mark. He works at Banana Inc., he is in a relationship with Anna, and he lives in Berlin. Now look at graph on the right. It add a vertex representing Germany and connects it to the vertex representing Berlin, with an edge labeled “capital of”.

9781484212523_Fig03-02.jpg

Figure 3-2. The definition of a vertex through its neighbors

The interesting thing here is that by adding a vertex to the graph and connecting it to a vertex that is not Mark, you find out more about Mark: that Mark lives in Germany. Moreover, assuming for this example that Mark is not telecommuting, you can infer that Banana Inc. has an office in Berlin. The more information is contained in the neighbors, either direct or multi-hops away, the more you can deduce about a vertex.

In other words, the information about a vertex depends on the information about the vertices in the neighborhood. Naturally, those vertices also depend on their neighborhood. This introduces a recursion in the information about each vertex, where each vertex depends on other vertices. That is why graphs are tough. Each vertex depends on its neighbors. Mark is defined by the vertices Banana Inc., Anna, and Berlin, but Berlin is defined by vertex Germany, making Mark also defined indirectly by vertex Germany.

Let’s look at another example. It is commonly said that in order to judge a person, you have to look at her friends. But you may also say that to judge her friends, you have to look at their friends. As before, you end up having to look at the entire graph. Graph problems are often defined, in one form or another, through dependencies between vertices and their neighbors. Unfolding these dependencies is often what makes graph computations complex and expensive.

Image Important  The information about a vertex depends on the information about its neighbors. This makes graph problems recursive.

Fortunately, you know that these types of problems can be solved iteratively. Iteration after iteration, an iterative algorithm unfolds these dependencies one level at a time. For this reason, graph algorithms are often expressed as iterative algorithms, where some of the vertices are visited (once or multiple times) during each iteration. As the computation proceeds and intermediate states are computed, information about each vertex is updated in the face of updated intermediate state until the final results are computed. This is why the Giraph programming model is designed to express iterative algorithms and the Giraph execution engine is optimized for iterative computations. You have to think in terms of iterative algorithms.

Image Important  Graph problems can be solved nicely through iterative algorithms. This is why Giraph is designed and optimized for iterative computations.

Let’s look at an example to clarify these concepts. Figure 3-3 shows a social graph with a number of people are connected by a friendship relationship. Imagine that they want to find out who is the oldest in the graph, but they can only communicate between friends. You can assign to each vertex a value that you initialize with the age of the person, and define the problem recursively by defining the value of each vertex as the largest value between its own value and the values of the neighbors. This definition is recursive because each vertex value depends on the value of the neighbors. This recursive definition works because the oldest person will affect the value of their friends, which in turn will affect the value of their friends, so that in the end the value of each individual depends on the age of the oldest person in the graph.

9781484212523_Fig03-03.jpg

Figure 3-3. A social network of four individuals connected by a friendship relationship

But how do you solve this problem? You organize the computation in a series of iterations, where during each iteration each vertex sets its own value to the largest between its current value and the value of its neighbors. Iteration after iteration, the age of the oldest person flows through the graph until it reaches all the vertices. If at any iteration no vertex updates its value, the computation has reached its final iteration, and each vertex has in its value the age of the oldest person in the graph. Figure 3-4 shows the execution of this example. Note how the Carla’s age reaches first Anna, then John, and ultimately Mark, iteration after iteration. Also, initially the Mark vertex updates its value based on John’s and later updates it again, after the John vertex updates its value. The next section looks at how this maps to what is usually referred to as the vertex-centric approach of Giraph.

9781484212523_Fig03-04.jpg

Figure 3-4. Execution of the algorithm that finds the oldest age (maximum value) in the graph

A Vertex-centric Perspective

According to the Giraph programming model and API, you have to put yourself in the shoes of being a vertex that has a value and that can exchange messages with other vertices in a number of iterations. This section presents how this works. To simplify the presentation and fit in as many examples as possible, we use pseudo-code instead of Java (in which Giraph is written and can be programmed). Starting in Chapter 5, after all the concepts are clear and you are familiar with the paradigm, the book’s examples use Java code with the actual Giraph API. The two APIs (pseudo-code and Java) match perfectly.

THE USE OF PSEUDO-CODE IN THIS BOOK

This chapter and the next use a pseudo-code language to present the Giraph API and the implementation of the algorithms. This allows you to focus on the programming model without thinking about the particularities of the (Java) language. Also, because the pseudo-code is much less verbose than Java, these chapters can cover much more material.

The language is heavily inspired by Python, so if you know that language you won’t have problems understanding the code. Most of the API uses the same naming as in the official Java API so you can easily move from the content learned here to the content present in Part 2 and Part 3, which present Java code.

The Giraph Data Model

This section presents the Giraph data model: the way a graph is represented. Look back at the example in Figure 3-4. Each person was represented with a vertex, and two vertices were connected if the people knew each other. This is how you build a social graph. You can use the name of a person as the identifier—for example, with a string. Each person also knows the friends they are connected to, so the vertex has outgoing edges. Keep in mind that with certain graphs, like street maps, you can have also values assigned to the edges. Vertices with their identifiers, and edges with their values, represent the graph. For the specific problem of finding the highest age, you needed to store an integer value with each person. This value was modified each time a vertex discovered a higher age. Vertex IDs, outgoing edges with values, and vertex values are the elements of the data model of Giraph. Figure 3-5 summarizes these elements.

9781484212523_Fig03-05.jpg

Figure 3-5. The Giraph data model, with vertices, IDs, values, and edges

A bit more rigorously, according to Giraph, a graph is a set of vertices and edges, and each vertex is defined as follows:

  • It has a unique ID, defined by a type (an integer, a string, and so on)
  • It has a value, also defined by a type (a double, an integer, and so on)
  • It has a number of outgoing edges that point to other vertices.
  • Each edge can have a value, also defined by a type (an integer, a double, and so on).

The first important thing to notice from this list is that the data model is a directed graph and edges are assigned to their source vertex. In principle, vertices are aware only of their outgoing edges, and if an algorithm needs to know the incoming ones, it must discover them as part of the algorithm. The section “Converting a Directed Graph to Undirected” presents an algorithm to do this. A second important thing to notice is that for each element (vertex ID and value, edge value), you have to define a type. This type can be either a primitive type, like an integer or a double, or a composite type, like a class.

Although vertex IDs depend on the graph—for example, a web graph has vertex IDs characterized by URLs (hence strings)—vertex values are often dependent on the algorithm. For example, shortest-distances algorithms define vertex values as integers for unweighted graphs and as doubles for weighted graphs, PageRank defines vertex values as doubles, and recommendation algorithms often define vertex values as vectors of floats. Chapter 4 presents all these algorithms and their implementation in Giraph. For each algorithm you write, you must decide which data type fits the vertex value.

Edge values, on the other hand, lie somewhere in between. For certain algorithms, you use no edge values; for others, you use values to model the label (if any) attached to the edge in the input graph; and for others, you use values to model the weight. For still other algorithms, you use a totally new edge value type that has nothing to do with the actual graph; the algorithm may use it to store intermediate results. Vertex values tend to change during the computation, because they are part of the intermediate results, but edge values tend to stay the same. But again, this is not a rule. If you look back at the example in Figure 3-4, the graph would be represented with string IDs for names, integer values for ages, and no edge values.

In Giraph, each vertex object is an instance of the Vertex class. The interface of the Vertex class, presented in Listing 3-1, lets you access the vertex value and the edges and their values and add and remove edges. For now, ignore the voteToHalt() method, which is presented in the next section. Giraph comes with a default implementation of vertices and edges, so you do not need to implement them yourself as part of your application (unless you want some specific behavior). You need to define the types of the vertex ID, the vertex value, and the edge value.

#1 Returns the ID of the vertex. The return type depends on the type of the ID.

#2 Returns the value of the vertex. The return type depends on the type of the vertex value.

#3 Sets the value of the vertex. The type of the parameter depends on the type of the value.

#4 Returns all the outgoing edges for the vertex in the form of an iterable of Edge objects.

#5 Returns the number of outgoing edges for the vertex

#6 Returns the value of the first edge connecting to the target vertex, if any. The return type depends on the type of the edge value.

#7 Sets the value of the first edge connecting to the target vertex, if any. The type of the parameter depends on the type of the value.

#8 Returns the values associated with all the edges connecting to a specific vertex. This methods is useful when managing multigraphs.

#9 Makes the vertex vote to halt

#10 Adds an edge to the vertex

#11 Removes all edges pointing to the target vertex

The Edge class is even simpler. It has only three methods: one to get the ID of the other endpoint, one to get the value, and one to set the value. The interface is presented in Listing 3-2.

#1 Returns the ID of the target vertex. The return type depends on the type of the ID.

#2 Returns the value attached to the edge. The return type depends on the type of the edge value.

#3 Sets the value of the edge. The type of the parameter depends on the type of the edge value.

Now that you have seen how the graph is represented in Giraph and how you can access it programmatically, you are ready to look at how to express an algorithm.

A Computation Based on Messages and Supersteps

Once you have defined the way your graph looks through the type of vertex ID, vertex, and edge values, Giraph needs you to write a UDF called compute. As mentioned earlier, Giraph requires you to “think like a vertex.” All this logic is put in the compute() method. But before you dig into the API, let’s look at the way the computation is organized.

A Giraph computation is organized in a series of supersteps. They are called that because a superstep is composed of steps, as you see later in this chapter. Intuitively, you can think of a superstep as an iteration of an algorithm; this is not always the case but often is. At each superstep, a vertex can send messages to other vertices, access its vertex value and its edges, and vote to halt. Sent messages are delivered to the destination vertex at the beginning of the next superstep. Every vertex can be in either the active state or the inactive state. Only active vertices are computed during each superstep (once). At the beginning of the computation, every vertex is in the active state, and it can switch to the inactive state by voting to halt. A vertex votes to halt because it decides that from its local perspective, its work is done, and the computation may conclude. A vertex that is in the inactive state is not executed during a superstep unless it has received a message. The delivery of a message switches a vertex back from the inactive to the active state. A Giraph computation is over when all vertices have voted to halt and no message needs to be delivered. The diagram in Figure 3-6 illustrates the way a vertex can change state between active and inactive.

9781484212523_Fig03-06.jpg

Figure 3-6. Diagram of transitions between vertex states

A Giraph computation is said to be synchronous. A superstep is concluded when all active vertices have been computed and all messages have been delivered. Giraph will not compute the next superstep until all vertices have been computed. Because active vertices have to wait for the other vertices to be computed during the current superstep before their next superstep can be computed, the computation is referred to as synchronous and the waiting phase is called the synchronization barrier.

A Giraph computation is distributed and parallelized by spreading vertices, with their edges, across a number of processing units—for example, machines or CPU cores. During each iteration, each unit is responsible for executing the compute() method on the vertices assigned to it. Each unit is also responsible for delivering the sent messages to the units responsible for the vertices that should receive the messages. This means the more units involved in the computation, the more vertices can be executed in parallel. But the more units you have, the more communication is produced. Chapter 6 is dedicated to the architecture of Giraph and dives into the details. Figure 3-7 shows the computation of a vertex that receives three messages (5, 7, and 20), chooses the largest one (20), updates its value (from 2 to 20), and sends the value to its neighbors. Note that this chapter shows the synchronization barrier and illustrates the messages being sent with their own dashed arrows. To keep the figures leaner, later figures in the book do not include the synchronization barrier, and messages are drawn directly on the edges over which they are sent.

9781484212523_Fig03-07.jpg

Figure 3-7. Computation of a vertex that receives values through messages and propagates the largest

In practice, to write an algorithm, you have to implement the compute() method of a class called BasicComputation. At each superstep, Giraph calls the compute() method on all active vertices, delivering the messages sent to that vertex (if any) during the previous superstep. The signature of the compute() method is compute(vertex, messages), where the first parameter is the vertex that is being computed and the second parameter is a container with the messages sent to that vertex (you can think of it for now as a list of messages). The BasicComputation class must be defined with the same three types as the vertex it operates on. However, in addition to these three types, you also need to define a fourth type: the message type. Giraph needs this information to know how to store and deliver messages. Listing 3-3 presents the most relevant methods of the BasicComputation class.

#1 The method to implement, which is called by the Giraph runtime

#2 Returns the current superstep

#3 Returns the total number of vertices in the graph

#4 Returns the total number of edges in the graph

#5 Sends a message to the target vertex

#6 Sends a message to the endpoints of all the outgoing edges of a vertex

#7 Requests the addition of a vertex to the graph

#8 Requests the removal of a vertex from the graph

This is all you need to know to program a basic graph algorithm in Giraph. The rest of this chapter presents the other parts of the basic API, but for now let’s focus on how to implement the example in Figure 3-4 using what you have seen so far. Listing 3-4 presents the compute() method that implements the algorithm.

#1 Identify the largest value across those sent as a message.

#2 The value is larger than the value discovered so far by this vertex, so update and propagate.

#3 Vote to halt.

Figure 3-8 illustrates the execution of the algorithm on the graph from Figure 3-3. As you can see, the largest value propagates quickly through the graph. When vertices discover new, larger values, they are updated and propagated until all the vertices have discovered the largest value.

9781484212523_Fig03-08.jpg

Figure 3-8. Computation of the MaxValue algorithm in Giraph

Now what you have seen the simple Giraph API, the next section explores how you can make your algorithm more efficient through combiners.

THE BULK SYNCHRONOUS PARALLEL (BSP) MODEL

Some of the terminology in Giraph is borrowed from the bulk synchronous parallel (BSP) model, which inspired Pregel and Giraph. If you are interested in getting to know more about the BSP model, a section is dedicated to it at the end of this chapter. Keep in mind, however, that you do not need to know how the BSP model works to play with Giraph.

Reducing Messages with a Combiner

Messages play a very important role in Giraph, because they allow vertices to share information. Also, because Giraph uses messages instead of shared memory, graph computations can be parallelized without using (expensive) concurrency primitives. Still, exchanging messages has its cost and can impact on the total runtime of a computation. Can you reduce the number of messages?

A combiner is a function that combines messages sent to a vertex. Combining messages allows Giraph to send less data between processing units. A combiner is very simple and combines two messages into one. The messages it combines were sent to the same vertex during the current superstep. What is important is that there are no guarantees about how many times the combiner will be called or if it will be called at all. Basically, the only assumption you can make is that the messages passed to the combiner are all destined for the same vertex.

Because a combiner receives a partial collection of messages and can be called multiple times, it must apply a function that is commutative and associative. Listing 3-5 shows the interface of the MessageCombiner class. Clearly, because a combiner is defined on a specific type of message, an algorithm must use a combiner that matches the type of messages the vertices send.

COMMUTATIVE AND ASSOCIATIVE FUNCTIONS

A commutative function produces the same result regardless of the order in which it is applied on the elements. For example, sum is commutative:

1 + 2 = 2 + 1 or, more extensively, 1 + 2 + 3 + 4 = 3 + 2 + 4 + 1, and so on.

An associative function can be applied to subgroups of the input elements and produce the same result. For example, sum is also associative:

1 + 2 + 3 + 4 = (1 + 2) + (3 + 4) = 1 + (2 + 3) + 4, and so on.

#1 Returns the combination of the two messages

For the MaxValue algorithm, a viable combiner is one that returns the largest value of the two. Listing 3-6 shows the pseudo-code for this combiner.

#1 Returns the largest value

Now, let’s look at the effect of using the combiner on the example computation for the largest value (on a different graph). Figure 3-9 shows one superstep. On the left, messages are delivered as produced. On the right, however, Giraph applies the combiner two times, reducing the number of messages sent for that vertex to a third as many. Note also that the second combiner round might not have happened, because Giraph does not give any guarantees. In that case, the number of messages would be reduced only to two-thirds as many.

9781484212523_Fig03-09.jpg

Figure 3-9. The effect of using the MaxValue combiner on the messages

The interesting thing about the combiner is that the compute() method cannot make any assumptions whether a combiner will be executed. For this reason, the combining logic of the combiner is often performed by the compute() method. Note how Listing 3-6 computes a max() function on the input messages in line maxValue = max(messages). This is the same function as the combiner. The combiner is executed before you enter the compute() method on some of the messages, but the result is the same.

Combiners are very useful for minimizing the use of resources by Giraph. Often you can apply a simple combiner, but not all algorithms can have one. Keep this in mind when you design your own algorithm.

Computing Global Functions with Aggregators

Computing the maximum value or other aggregations on the values associated with vertices can be expressed as a graph algorithm. But it would be easier if you could compute these aggregations without having to propagate messages across vertices in a number of supersteps. This is what aggregators were introduced for. Aggregators allow you to think of aggregations as global functions to which vertices can send values. During each superstep, these global functions aggregate the values, and the results are available to the vertices during the following superstep.

Aggregators compute global functions, but they are executed in parallel across processing units, and they are scalable. Aggregators, like combiners, require the function to be commutative and associative. The interface of an aggregator is presented in Listing 3-7.

#1 Aggregates the value
#2 Returns the aggregated value
#3 Sets the aggregated value to the parameter
#4 Resets the aggregated value to a neutral value

Let’s go back to the example. Instead of having vertices propagate their values, you can make vertices send their values to the aggregator during the first superstep and then vote to halt. This algorithm would execute only one superstep and finish the computation. Listing 3-8 presents the code for the MaxValueAggregator class. Note that for this kind of algorithm, Giraph probably is not the right tool for the job, because it does not exploit the structure of the graph. Still, algorithms can often use aggregators as part of their graph computations; Chapter 4 presents two such cases.

#1 Local variable where the aggregated value is stored
#2 Updates the local value to the new value, if larger
#3 Returns the aggregated value
#4 Sets the value to the new value, used (for example) for initialization
#5 Resets the value to the neutral value –Inf (neutral to the max() function)

Listing 3-9 shows the pseudo-code for a trivial MaxValue algorithm that uses an aggregator. For simplicity, the code assumes you have the reference to an aggregator object called maxValueAggregator. In Giraph, however, you can use multiple aggregators at the same time—each with a string name to distinguish it—that need to be declared and initialized before they are used. Presenting this complete API would require introducing parts of the API that are not relevant to understanding how aggregators work and how to use them. Chapter 5 includes the complete API in its presentation of the Java API.

#1 Aggregates the vertex value through the MaxValue aggregator

Figure 3-10 shows how the new computation is organized. Note how the aggregator is computed autonomously for each value, because the function is associative and commutative.

9781484212523_Fig03-10.jpg

Figure 3-10. Computation of MaxValue through an aggregator

You need to know one last thing about aggregators before you move on. You might have noticed the reset() method in the Aggregator class. This method exists because there are two types of aggregators: regular and persistent. The value of a regular aggregator is reset to the initial value in each superstep, whereas the value of persistent aggregator lives through the application. Note that a call to getAggregatedValue() returns the value computed during the previous superstep, not the current one. The Giraph runtime uses the reset() method to reset the value of a regular aggregator. Hence, it should reinitialize the aggregator to a neutral value, like 0 for a sum aggregator or -Infinity for a MaxValue aggregator.

Imagine a simple application in which during each superstep, vertices send a value of 1 to an aggregator that sums all these values. If the aggregator is regular, then during each superstep (except the first one), the aggregator contains a number that is equal to the total number of vertices. Instead, if the aggregator is persistent the aggregated value will be the number of vertices times the superstep number. So, if you had four vertices in a computation of three supersteps, at the end of the computation a regular sum aggregator would have a value of 4, and a persistent sum aggregator would have a value of 12.

This chapter has presented the basic API and assumed that the graph was already loaded and initialized in memory and that the final superstep would be the last part of the computation. The next section looks at what happens before the first superstep and after the last superstep, to conclude your tour of a Giraph computation.

The Anatomy of a Giraph Computation

This chapter’s presentation of the API has assumed that the graph was already loaded into memory, the values were initialized, and the last superstep would be the last phase of the computation. Let’s look at a more complete overview. Figure 3-11 shows the anatomy of a Giraph computation.

9781484212523_Fig03-11.jpg

Figure 3-11. The different phases and steps of a Giraph computation executed by each processing unit

The computation starts with Giraph loading the graph into memory across the processing units. In Giraph, the loading phase is considered superstep -1. The graph is usually read from persistent storage, like a filesystem or a database (both most likely distributed). This phase, the loading phase, happens in parallel, when possible, and requires converting the data read from storage into the internal representation of a graph. As Chapter 7 explains, Giraph provides an API that allows you to define the format of the persistent data and how it should be parsed to load the graph. The loading phase loads all the vertices and their edges and initializes the vertex values and the edge values. Once the graph is loaded, the computation can start.

The computation phase consists of one or multiple supersteps. Each superstep is divided into three phases:

  1. The processing units iterate over the active vertices and call the compute() method with the corresponding messages, if any. As the vertices produce messages, they are sent to the corresponding processing unit, depending on the destination vertex. At various points, combiners can be applied to messages, if defined, either before messages leave a source or when they reach the destination. The processing units finish computing the vertices and finish sending the remaining messages. Each processing unit waits at the synchronization barrier for the other units to finish.
  2. The processing units conclude the computation of the aggregators, if any, and aggregate the local aggregations. Local computations that involve aggregators can be performed when computing the vertices, because aggregation functions are commutative and associative. This means the local aggregations are not necessarily computed after all vertices have been computed. When processing units are finished with aggregations, they can move to the next superstep, if the computation is not over.
  3. In the offloading phase—the last phase of the computation—the processing units have in memory the vertices with their vertex values and the edges with their values. This data represents the results (often, if the algorithm does not change the graph, only the vertex values contain the end results). The processing units go through this data and write it back to persistent storage. The entire computation is now over, and the results can be used.

DIFFERENCE BETWEEN THE BSP MODEL AND GIRAPH

Traditionally, in the BSP model, a superstep is divided into three steps: the local computation, the communication, and the synchronization barrier. These steps are computed one after the other. When a units has finished computing its state locally, it starts sending data.

Giraph computes a superstep differently, and it overlaps local computation and communication. Processing units begin exchanging messages as soon as they are produced, instead of waiting for the computation of vertices to be finished. This helps dilute network usage over a longer period of time. This aspect is particularly helpful in Hadoop clusters that are running multiple jobs, of which Giraph is one, so that Giraph does not saturate the network.

You have seen the basic Giraph API and how the computation is organized. Part 2 and Part 3 of the book present the Giraph Java API. Before you move to Chapter 4, which presents Giraph implementations of algorithms for real-world graph analytics, let’s spend the next two sections looking at some simple algorithms to help make you more familiar with what you have seen so far.

Computing In-Out-Degrees

To acquaint you with the Giraph programming model, this section presents a simple graph algorithm that could be considered the “Hello World” of graph computations (if you are familiar with Hadoop MapReduce, this example is like the “word counting” of Giraph).

As of December 31, 2012, Facebook was reported to have 1.06 billion monthly active users and 150 billion friend connections, which in graph terms means 1.06 billion vertices and 150 billion undirected edges. How many connections does a user have on Facebook or on Twitter? Figure 3-12 shows two social networks: one with symmetrical friend relationships (Facebook) and one with asymmetrical follower relationships (Twitter). In both, the graph has three vertices and three edges. In the first case, the average number of friends per user is two; and in the second case, the average number of followers per user is one. This is interesting because the ratio between the number of vertices and the number of edges is the same. However, because the Twitter graph is directed, and you are considering only the incoming edges (the followers), you do the counting differently. If you focus solely on the number of edges that “touch” a vertex, in both cases the average is two.

9781484212523_Fig03-12.jpg

Figure 3-12. Vertex degrees in directed and undirected graphs

The number of edges that touch a vertex is called the degree of that vertex. For directed graphs, a vertex has an in-degree (the number of incoming edges), an out-degree (the number of outgoing edges), and an in-out-degree (the sum of both). If you consider the in-out-degree, then the degree of the vertices in a directed graph is computed the same way as for an undirected graph. If you were to convert the undirected Facebook graph to a directed one, where two directed relationships with opposite direction substitute for each undirected friend relationship, the graph would contain six edges.

The question is, how can you compute in-out-degrees with the Giraph vertex-centric programming model? The fact that each vertex only knows about its outgoing edges requires some exchange of information between each vertex and its neighbors. To discover its incoming edges, a vertex can take advantage of its messaging capabilities and inform its neighbors of its existence through the outgoing edges. Figure 3-13 shows a directed graph with seven vertices, for which you want to compute the in-out-degrees (you can think of it as a graph of Twitter followership).

9781484212523_Fig03-13.jpg

Figure 3-13. A Twitter-like network of users and follower relationships

The algorithm works as follows:

  1. At the beginning, all vertices are active and have their value initialized to 0.
  2. During superstep 0, each vertex computes the number of outgoing edges and sets its value to this number. If you wanted to compute the out-degree only, this would be enough, and the vertices would contain the out-degree in their value. However, the incoming edges are still unknown to each vertex, so each vertex sends a message through its outgoing edges and votes to halt. When all the vertices have finished computing their degree and have sent the messages, the following superstep begins.
  3. Vertices with incoming edges are woken up by the messages sent to them during the previous superstep. Each vertex has received a number of messages equal to the number of vertices pointing to it with an incoming edge. Each vertex now counts the number of messages, effectively computing its in-degree, adds it to its out-degree, and votes to halt. At this point, all the vertices have their in-out-degree in their value, and the computation is over.

Figure 3-14 shows the flow of the computation for the graph presented in Figure 3-13.

9781484212523_Fig03-14.jpg

Figure 3-14. Computing the in-out-degree in Giraph

The computation of vertex degrees on the Facebook graph is trivial: it boils down to counting the number of edges. When an undirected graph is modeled through a directed one, as in Giraph, each incoming edge corresponds to an outgoing edge. Hence, a vertex can just count the number of edges it has, without having to depend on incoming messages. Listing 3-10 shows the pseudo-code for the algorithm.

#1 Initializes the vertex value to the out-degree, and propagates it
#2 Counts the incoming messages as the number of incoming edges
#3 Sums in-degree and out-degree to compute the in-out-degree, and sets the vertex value to it

In a few lines, you have developed an application that computes the in-out-degree of each vertex across hundreds or thousands of machines without having to worry about concurrency or parallel and distributed computing. With all this discussion about directed and undirected graphs, let’s look at an algorithm to convert a directed graph to a (logically) undirected graph.

Converting a Directed Graph to Undirected

Certain algorithms require graphs to be undirected. Giraph supports only directed graphs, so this means a logically undirected graph. Here, logically means that each edge in the graph has a corresponding edge in the opposite direction. Because each edge has a corresponding edge in the opposite direction, direction is lost.

Take Figure 3-15 as an example. The original directed graph is converted to a logically undirected graph by adding an edge in the opposite direction if it is not already present. This simple conversion strategy may make you lose some edge information, but don’t worry for now; you look into that in a moment. In the vertex-centric API, the vertex that will be the source vertex can add the edge.

9781484212523_Fig03-15.jpg

Figure 3-15. The representation of a directed graph through a logically undirected graph

The algorithm works in two supersteps, as follows. Note that it does not use vertex values, because it works only on the topology of the graph:

  1. During superstep 0, each vertex sends its vertex ID to the neighbors.
  2. During superstep 1, each vertex receives the IDs of the vertices that are endpoints of the incoming edges. That is, the vertex discovers its incoming edges. For each of these IDs, each vertex checks whether an edge in the opposite direction is already present. If this is not the case, the vertex creates an edge toward that vertex. In addition, active vertices vote to halt during both supersteps.

Listing 3-11 presents the pseudo-code for this algorithm.

#1 Initially propagates the ID to all the neighbors
#2 Adds an edge if an edge toward the target does not exist already

In some cases, you may want to use the edge value to store information about whether the original graph contained only a single edge in one direction. For example, you might want to assign an edge value of 2 when the original graph had two corresponding edges, and a value of 1 in the case where only one edge was present. Figure 3-16, for example, converts the original graph in Figure 3-15 according to this new heuristic. The input graph is assumed to have a value of 1 initially assigned to each edge. The pseudo-code for this heuristic is presented in Listing 3-12.

9781484212523_Fig03-16.jpg

Figure 3-16. The representation of a directed graph through a weighted logically undirected graph

#1 Initially propagates the ID to all the neighbors
#2 Adds an edge if an edge toward the target does not exist already
#3 Updates the weight to 2 if an edge toward the target already exists

Again, with a few lines, you can express an algorithm that can convert a large graph across many machines. Figure 3-17 shows the flow of the algorithm.

9781484212523_Fig03-17.jpg

Figure 3-17. The computation in Giraph of the graph conversion to weighted logically undirected

You have seen how to write iterative graph algorithms with the Giraph API and how the distributed engine executes the algorithm across a number of processing units. Before this chapter concludes, let’s open the hood of Giraph and look at the computational model that inspired Giraph.

Understanding the Bulk Synchronous Parallel Model

This section looks at the model that inspired Giraph. As a Giraph user, you see many terms in the API that are borrowed from BSP model, such as superstep and synchronization barrier. As a Giraph developer, if you ever need to extend Giraph, its internals contain many references to the BSP model, and you need a better understanding of how the underlying computational model works. This section teaches you how Giraph works under the hood, but keep in mind that you don’t need to think in BSP terms when you program Giraph. These concepts are borrowed by the Giraph model, or are hidden, so you can program Giraph using just the concepts you have learned up to now.

Imagine that you have a list of 20 positive integer numbers, and you want to find the largest value. On a sequential machine, this would require going through the entire list, saving the current largest value in a variable, and comparing it sequentially with each element in the list. At the end of the computation, this variable would contain the largest value in the list.

With ten machines, can you parallelize this computation? Yes, you can. You assign two numbers to each machine, and you let each machine find the largest value among those assigned to that machine. The problem you need to solve at this point is how to compare the 20 values assigned to the 10 machines to find the largest among them all (also in parallel, of course). Also, you want to avoid every machine sending its value to all the other machines. Toward this end, you can organize the computation in a tree:

  1. Machine 1 sends its value to machine 0, which compares the two values and saves the largest. Machine 3 sends its value to machine 2, which compares the two values and saves the largest, and so on. Note that this step happens in parallel. You now have five machines with five values that need to be compared.
  2. Again, you organize the process hierarchically. So, machine 2 sends its value to machine 0, which compares the two values and picks the largest, machine 6 sends its value to machine 4, which compares the results and picks the largest, and machine 8, which is alone, sends its value to machine 4 as well (which clearly has to compare three values this round).
  3. You have one step to go: machine 4 sends its value to machine 0, which compares the two remaining values and picks the overall largest value.

Figure 3-18 presents this computation.

9781484212523_Fig03-18.jpg

Figure 3-18. The organization of a parallel MaxValue computation across ten machines

This is one way of organizing a computation hierarchically to compute a problem in parallel. Not all problems need to be organized hierarchically, hence having all the machines busy only during the first step, but this example is simple enough to serve the purpose. Other algorithms would have each machine compute its part of the subproblem and communicate the result to some other machine. The BSP model generalizes these approaches in an abstract machine to compute parallel algorithms.

According to BSP, you have n processing units, which can communicate through a medium such as a network or a bus. You divide the input across the processing units, and you let each processing unit compute its intermediate solution to its subproblem locally. When the processing units have finished, they exchange the intermediate results according to the semantics of the algorithms. When a processing unit has finished computing its subproblem and sending its intermediate results, it waits for the others to finish as well. When all processing units have finished, they go on with the next iteration, computing their subproblem based on their previously computed state and the messages they have received. Each iteration is called a superstep, and the waiting phase is called the synchronization barrier—in Giraph, many concepts are borrowed from the BSP model. Figure 3-19 illustrates the conceptual organization of the BSP model.

9781484212523_Fig03-19.jpg

Figure 3-19. The organization of a BSP computation across five processing units

At this point, you should have noticed the matching between the BSP model and the Giraph model presented in this chapter. The graph is split across the processing units, and the intermediate results exchanged during the communication phase are the messages produced by the assigned vertices. Each processing unit keeps its local state in memory, represented by the assigned vertices with their values and the messages to be processed. Figure 3-20 shows the mapping between the BSP model and the Giraph model.

9781484212523_Fig03-20.jpg

Figure 3-20. The mapping of the BSP model to Giraph

As mentioned earlier, it is not necessary to think about a BSP machine when designing graph algorithms. Giraph builds on top of it so you can forget the underlying abstraction. Still, it can help you understand how Giraph was designed for iterative computations that are executed in a parallel, distributed system.

Summary

Designing and executing graph algorithms and a system to process them at scale is difficult. Giraph provides an intuitive programming paradigm that simplifies writing scalable graph algorithms, hiding the complexity of parallel, distributed systems. In this chapter, you learned the following:

  • In graph algorithms, the computation of each vertex frequently depends on the computation of vertices nearby. For this reason, graph algorithms are often iterative.
  • A platform for the processing of graph algorithms must support fast, iterative computations and possibly hide the complexity of distributed, concurrent programming.
  • Giraph provides a simple vertex-centric API that requires you to “think like a vertex” that can exchange messages with other vertices.
  • You can combine messages to minimize the number of messages transmitted between supersteps.
  • You can use aggregators to compute aggregation functions across the values of the graph vertices. Aggregators are very scalable in Giraph.
  • Computing vertex degrees and converting graphs from directed to undirected requires only a few lines of code in Giraph.
  • The bulk synchronous parallel model defines parallel computations executed across a number of processing units. Giraph builds on top of this abstraction.

You have seen how to fit Giraph into a system architecture and how to program Giraph. You are now ready to look at more examples of algorithms and how to implement them in Giraph. The next chapter is dedicated to writing scalable graph algorithms with the Giraph programming model; toward this end, it presents the implementation of some of the algorithms from in Chapter 2.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset