CHAPTER 8

image

Beyond the Basic API

This chapter covers

  • Graph mutations
  • The Aggregator API
  • Vertex coordination
  • Writing modular applications

In the previous chapters, you learned how to use the basic parts of the Giraph programming API to implement various graph algorithms. While the basic API already gives you enough flexibility to build a wealth of useful graph mining algorithms, this chapter covers features of the Giraph API beyond the basic ones that enable you to write more sophisticated applications. The Giraph API is rich with features that allow you to add more functionality to your applications, write algorithms more efficiently, and even make your life easier from a programming perspective.

While we typically think of the input graph as an immutable data structure, this chapter discusses scenarios where you want to alter the input graph by adding or removing vertices and edges. Giraph provides an API that allows you to perform such mutations to the input graph. Examples are used to describe the different ways that you can use this API.

Next, you revisit aggregators, a tool that allows you to compute global statistics across the entire graph. In previous chapters, you saw examples of typical aggregators and learned how to use them inside of your applications. Here you look at the Java API that Giraph provides for writing custom aggregators; it is explained by implementing the aggregator.

Further, one of the most important features in distributed algorithms is coordination. So far you learned how to think in a vertex-centric way, writing programs where vertices communicate with each other through messages in a distributed fashion. Giraph took care of the coordination of the execution of the algorithm. Here, you learn how to intervene in this coordination process when you wish to further customize the execution of your algorithms.

This chapter also covers the Giraph API features that make it easier for you to write sophisticated applications from a programming perspective. You learn how to break down a potentially complex algorithm into modules that perform distinct logical operations, resulting in code that is cleaner and easier to understand. Inversely, you also learn how to combine these modules into different applications, improving code reuse and saving you programming effort.

Graph Mutations

In this subsection, you explore the ability to mutate the graph structure during the execution of an algorithm. In general, mutating a graph means adding or removing vertices and edges. While for the most part, you have thought about analyzing an input graph, here you see that mutating a graph is often a part of this analysis.

There are different reasons graph mutation might be necessary. In some scenarios, you may simply want to transform an input graph; for instance, as a preprocessing step. You already saw an example of graph transformation in Chapters 3 and 5, where you learned how to convert a graph from directed to undirected. In such cases, graph mutation is a natural requirement.

In other scenarios, the output of your analysis may be a graph that is completely different from your input graph. A common case is when you want to divide an input graph into communities or clusters and subsequently analyze the connections among the communities themselves. Community detection is a common application in social network analysis. In an online social network, you may want to find communities of users with similar profiles and output how the communities themselves connect with each other. In this case, the output of the community detection algorithm is a graph representing the communities, a graph structure totally different from your input graph. This implies that during the execution of your algorithm, somehow you must be able to create a graph; that is, create new vertices and connect them with new edges.

Graph generators make for another application scenario that requires graph mutation. Graph generators are useful tools that allow users to construct synthetic graphs that conform to some model, such as the popular “small-world” model. In fact, the Giraph code already contains implementations of a couple of graph generators. Graph mutations are natural in this case; these tools may start from an empty graph or an initial seed graph and gradually add vertices and edges until they build the final graph.

Before going into the API that allows you to change the graph, it is important to understand what it means to mutate the input graph. At this point, you have to remember that Giraph loads the input graph in-memory and maintains a copy in an internal representation. It is this copy of the graph that Giraph manipulates and can mutate if desired. The original copy of the graph remains intact. For instance, if you had stored your graph in an HDFS directory on your Hadoop cluster, and then executed an algorithm that uses the mutation API, the graph data on HDFS would be exactly the same after the execution.

Of course, it is possible for a user to mutate this internal copy of the graph and output a new copy to some external storage system. In the case of converting the graph from directed to undirected, you would be creating a new copy of the graph in an undirected form. This would be the output of the Giraph job.

Image Note  It is always possible for a user to modify the original graph from within a vertex computation function. Imagine that the graph is stored in a table store, like HBase. Nothing prevents a user from writing a vertex computation that connects to HBase and modifies the graph. However, such an operation would be outside the scope of Giraph. Giraph makes no guarantees about the consistency of these changes. For instance, in the event one of the Giraph workers fails, Giraph guarantees that it will produce a correct result, but does not guarantee the correctness of such external changes. This is both a capability and risk that comes with the flexibility of the Giraph programming API. If you choose to take advantage of this flexibility, you should be aware of all the risks and take extra care to design your program such that it does not cause any problems, especially if used in a production environment.

Now that we have discussed the usefulness of graph mutations, let’s explore the actual API that gives you this ability. While Chapter 5 went thought the mutation API in brief, here it is explained in detail through a variety of examples.

The Mutation API

Giraph provides three ways to alter the graph structure during execution: (i) direct mutations on a vertex, (ii) mutation requests, and (iii) mutation through messages. Each of these may allow different types of graph mutations and have different semantics with respect to when the mutations are realized. Next, you review the APIs for each of these types of mutation and illustrate their use.

Direct Mutations

To illustrate the use of the direct mutation API, let’s use an example that you have already seen in Chapter 5. In this example, you want to convert an input graph from a directed to an undirected one. Figure 8-1 shows this transformation.

9781484212523_Fig08-01.jpg

Figure 8-1. Transforming a directed graph into an undirected one

In Figure 8-1, you see that such a transformation requires you to change the graph structure. In particular, you need to add some extra edges between existing vertices.

The Vertex interface contains a number of methods that allow the vertex computation function to directly change the structure of the vertex—that is, change its edges.

  • They can only change the vertex from which they are called.
  • The effect of such a call is immediate.

Let’s look at the Vertex methods that allow the mutation of its structure:

  • addEdge(Edge<I, E> edge): Adds an edge to the vertex. The ID and value of the edge are specified through the passed Edge object.
  • removeEdges(I targetVertexId): Removes from the vertex all the edges with the specified target ID.
  • setEdges(Iterable<Edge<I, E>> edges): Sets the outgoing edges of the vertex by iterating over the passed Iterable and adding the edges, one by one.

In Listing 8-1, let’s revisit the example code from Chapter 5; the way it uses the mutation API is explained in detail next.

#1 In the first superstep, every vertex sends its own ID to all its neighbors.

#2 In the second superstep, a vertex receives messages from every vertex that has an edge to it.

#3 For every such message the vertex gets, it checks whether the corresponding edge already exists.

#4 If the edge does not exist, the vertex adds it.

Note that the addEdge method takes an implementation of the Edge interface as input. While you can implement your own Edge types, Giraph already provides default implementations that should suffice in most cases. In particular, you can use the create() method of the EdgeFactory utility class to create Edge objects.

Mutation Requests

Next, you are going to look at an example showing why direct mutations might not always be suitable for modifying the graph structure and learn how mutation requests provide more flexibility. Let’s consider that you want to analyze a Twitter-like social network and explore, not the individual user connections as you have done so far, but how users from different countries are connected. Starting from your original social graph, you want to create a graph where every vertex corresponds to a country, and edges imply that there are users among these countries that are connected. This allows you to create a high-level view of the Twitter graph and observe the social influence at the country level as shown in Figure 8-2.

9781484212523_Fig08-02.jpg

Figure 8-2. Analyzing how users from different countries are connected

It is obvious that while you are analyzing the original graph, you are also creating a new one. In fact, the new graph is the actual output of the algorithm. However, since the vertices of this new graph do not exist in advance, you cannot really use the direct mutations as before. Let’s see how mutation requests solve this problem.

Mutation requests are methods of the Computation interface. Just like direct mutations, you can call these methods from within your vertex computation function. However, unlike direct mutations, a mutation request has the following properties:

  • It can modify any part of the graph, regardless of the vertex that makes the request. For instance, vertex A can request the addition of an edge between vertices B and C.
  • The effect of a mutation request is visible only after the current superstep finishes and right before the next one begins.

Image Caution  The second property might seem a bit unnatural at first, but it is really meant to simplify your programs. This property allows you to separate the programming logic that handles the creation or removal of vertices and edges from the rest of the programming logic. A vertex request has no effect on the remaining computation of the vertex that does the request, even if it modifies the same vertex. It has no effect on the computation of other vertices that may occur after the request is made, but within the same superstep.

Next, let’s see this API in action.

The Listing 8-2 shows the implementation of this algorithm. Although the logic may not be readily apparent, all the parts are explained in a moment.

#1 To every user I am following, I send the name of my own country.

#2 At superstep 1, every message contains the country of a user that follows me.

#3 For every message, create a new edge from the other country to my country.

#4 Each user vertex removes itself from the graph, so what we are left only with country vertices.

In this scenario, the ID of the vertices is of type Text, representing the name of the user, and the value of a vertex is also of type Text, representing the country of the user. The general way the algorithm works should be familiar. Somehow, you need to know about what pairs of countries hide inside the followership graph. The natural way to do this is to have every vertex tell its neighbors about their own country. The first superstep of the algorithm implements exactly this logic. Every vertex sends to the vertices it follows its own vertex value—that is, its corresponding country (#1).

In the second superstep, each vertex now knows what countries its followers are from. In other words, it knows what an edge in the new country graph should be. At this point, it can simply request the creation of such an edge from the follower’s country to its own country, using the addEdgeRequest() method (#3). The addEdgeRequest() method makes a request to add an edge at the vertex with the specified source ID. The edge value and destination ID are specified through the passed edge object.

Image Note  You do not have to explicitly create the vertices that represent the countries. By requesting the creation of an edge between two countries, Giraph automatically creates the corresponding vertices if they do not already exist.

You are not done yet though. By creating these new edges, there are two types of vertices at the end of the algorithm execution: user vertices that comprise the original graph and country vertices comprising the country graph. If you use an output format, you see that the output contains both of these types of vertices. In this case, though, you are only interested in the country graph, so ideally you would like the final graph to contain only the vertices representing the countries.

The mutation request API provides a solution to this. Apart from modifying the edges of a vertex, the mutation request API also allows you to create or delete vertices. To achieve this, you can make use of the removeVertexRequest() API. At superstep 0, apart from creating new edges and vertices, you can also remove the vertices that you do not need. Each vertex also makes a request to remove itself by calling the removeVertexRequest() method and passing as a parameter its own ID.

ALGORITHM DETAIL

The alert reader must have observed that no vertex halts in superstep 0. Imagine what would happen if vertices with no followers halted at superstep 0. They would never receive a message in superstep 1, and therefore, they would not be activated and run the compute() function. By not halting vertices at superstep 0, you allow every vertex to run the compute() function at superstep 1, and thus, allow them to remove themselves from the graph through the removeVertexRequest() method.

Finally, recall that by calling the addEdgeRequest() and removeVertexReqest() methods, a vertex only registers these requests with Giraph. The edges are not created immediately and the vertices are not removed immediately. You have to wait until the next superstep before these changes really happen.

Apart from these two methods that you saw in the example, the Computation interface provides the following methods as well:

  • addVertexRequest(I id, V value, OutEdges<I, E> edges): Makes a request to create a vertex with the specified ID, value, and edges.
  • addVertexRequest(I id, V value): Makes a request to create a vertex with the specified ID and value. The created vertex has no edges.
  • removeVertexRequest(I vertexId): Makes a request to remove the vertex with the specified ID. Nothing happens if the vertex with the specified ID does not exist.
  • addEdgeRequest(I sourceId, Edge<I, E> edge): Makes a request to add an edge at the vertex with the specified source ID. The edge value and destination ID are specified through the passed edge object. Note that this request affects only the vertex with the source ID.
  • removeEdgesRequest(I sourceId, I targetId): Makes a request to remove an edge from the vertex with the passed source ID. In particular, it removes the edge that has the passed target ID. Nothing happens if the edge does not exist. This call affects only the source vertex, not the target vertex.

Although this example did not make use of these methods, they have similar semantics. They allow a modification that gets realized, not immediately, but at the beginning of the next superstep.

Mutation Through Messages

Next, let’s look at a third alternative way to modify the graph in Giraph. Apart from the explicit graph mutation through the previous APIs, Giraph also allows the creation of a vertex implicitly, by sending a message to a vertex that does not exist. Let’s see what this means exactly and how it could be useful.

Let’s see how you can use this way of creating vertices through the previous example. In fact, you are going to enrich it with the calculation of more information. Let’s assume you now want to know, not only what countries are connected to each other, but also the number of users from one country that follows users from another country. This gives us a more informed view of the country relations; you also get a sense of the strength of the relationship among countries, and which is the most followed country in terms of the total number of followers. Figure 8-3 illustrates the result you wish to get by analyzing the original input graph.

9781484212523_Fig08-03.jpg

Figure 8-3. Counting how many users from one country follow users from another country

Let’s now look at the algorithm that computes these more detailed statistics. Unlike the previous example, here we assume that the edges have a value of type IntWritable that is used to hold the number of followers at the end of the execution. Listing 8-3 shows the implementation.

#1 Tell the users that I am following which country I am from.

#2 At superstep 1, every message contains the country of a user that follows me.

#3 For each country following me, send a message to a destination with ID set to that specific country. The message contains this vertex’s own country name.

#4 As before, each user vertex removes itself from the graph.

#5 At superstep 2, vertices representing countries are created and receive messages.

#6 A message contains a country name and represents the followership of a single user in that country.

#7 For every such country, check whether this vertex has already an edge to it. If not, add a new edge an initialize its value to 1.

#8 If the country already exists, increment its value by one for every message containing the corresponding country name.

ALGORITHM DETAIL

In the initial user graph, the edge value is not relevant and you simply assume that when the graph is constructed with the use of an InputFormat, it is set to 0.

Recall that the Giraph model allows a vertex to send a message simply by specifying the ID of the destination. In most of the cases, a vertex sends a message to one of its neighbors. It typically choses an ID by looking at its edges, which are constructed during the loading of the input graph. However, the Giraph API is really more flexible and allows you to use any ID in the sendMessage() method. Since you can specify any ID, you may have wondered what happens if you specify an ID that does not really exist in the input graph.

The answer is that Giraph creates the vertex for you. This way, your program can dynamically create a vertex by sending a message to it. In this case, the vertex is initialized with no edges and the vertex value is set to null.

Image Note  It is your responsibility as a programmer to write code that handles this initialization.

The very first part of this algorithm is the same as before. At superstep 0, every vertex sends its own country to the vertices it follows through a message (#1). Therefore, at superstep 1, every user vertex receives messages containing country names.

Before starting superstep 2, Giraph realizes that there are messages for vertices representing the countries. These vertices do not exist yet. Therefore, Giraph creates them and superstep 2 starts executing as usual. It delivers messages to all vertices and calls the compute() function.

In this case, only the newly created country vertices have messages. Messages contain country names as well. A message that country A receives that contains the country name B means there is a single followership from country A to country B. First, this means that there should be an edge between the vertex representing country A and the vertex representing country B (#7). Second, by counting the messages that contain country B, you know exactly how many users from country A follow users from country B. This count is the value of the edge created.

Now you have been offered three different ways to alter the graph structure during the execution of your algorithm. In general, which one you use depends on your algorithm and how creative you get. Sometimes you find that you can implement an algorithm using either of these ways, while in other scenarios you find that one type of mutation is preferred. For instance, when you only need to add edges to vertices and each vertex is responsible only for adding its own edges; like in the very first example, direct mutations are probably the way to go because they are faster. If, on the other hand, you require the creation of vertices as well, or if a vertex is responsible for adding edges to other vertices, the mutation requests are the solution. In general, you need to think a bit about the design of your algorithm and choose the option that fits best.

Resolving Mutation Conflicts

Even though this was not conveyed in the previous examples, conflicts in mutation requests may occur. For instance, nothing prevents two vertices from making the following conflicting request: one vertex requests the addition of an edge with a certain value, while another vertex may request the creation of the same edge with a different value. What should the value of the edge be in this case? Sometimes even more confusing requests may occur. For instance, a program may issue a request for the addition of an edge to a vertex, while at the same time issue a request for the deletion of the same vertex.

At first glance, these kinds of requests might seem irrational; you may wonder why somebody would ever write a program like this. But you find that when writing distributed programs, such disagreements among the different computations taking place are unavoidable. In fact, they are part of the design of a distributed algorithm as long as there is a way to resolve such conflicts. Think about a simple multithreaded program where two threads try to increment the same, shared counter. The result is undefined if the program is not properly designed, and programming languages typically provide synchronization primitives for you to do design programs correctly. Next, you see how Giraph allows you to resolve this kind of conflict.

First, let’s go back to the example of the creation of the Twitter country graph. The code is shown in Listing 8-4. Notice that both the “Mark” vertex and the “Anna” vertex make a request to create an edge for the “Germany” vertex with a target ID equal to “UK”.

Every time a set of changes happen on a vertex, like an edge addition or deletion, Giraph uses a VertexChange object to represent such changes and a VertexResolver object that takes a VertexChange as input and defines how to actually handle the changes. Let’s start by describing the VertexResolver interface in Listing 8-4.

#1 The ID of the vertex to resolve.

#2 The original vertex before the changes are applied.

#3 The set of changes to this vertex.

#4 Defines whether the vertex has messages sent to it in the previous superstep.

The VertexResolver interface has a single method. Giraph calls this method for a vertex and passes it the set of changes that have occurred. When you implement this method, you essentially define what the Vertex should look like after the changes. The VertexChange interface shown in Listing 8-5 helps to determine the changes requested for the vertex.

#1 Returns the list of vertex additions requested for this vertex ID.

#2 Returns the number remove requests there were for this vertex ID.

#3 Returns the list of edges that were requested to be added to this vertex.

#4 Returns the list of edges that were requested to be removed from this vertex.

Before implementing your own VertexResolver, let’s see what happens if you let Giraph use the default VertexResolver. The default vertex resolver performs the following operations:

  1. If there were any edge removal requests, first apply these removals.
  2. If there was a request to remove the vertex, then remove it. This is achieved by setting the return Vertex object to null.
  3. If there was a request to add the vertex, and it does not exist, then create the vertex.
  4. If the vertex has messages sent to it, and it does not exist, then create the vertex.
  5. If there was a request to add edges to the vertex, if the vertex does not exist, first create and then add the edges; otherwise, simply add the edges.

The order of this list is important because it defines exactly the way that Giraph resolves any conflicts by default. This means that if, for instance, there is request to remove a vertex and at the same time a request to add it, then because the default resolver checks the vertex creation after it does the deletion, it ends up creating the vertex.

THE DEFAULT VERTEX RESOLVER IMPLEMENTATION

Even though the implementation of the DefaultVertexResolver is not shown here, it is recommended that you look at the source code as an additional example. It gives you more insight into the use of the VertexResolvers.

Now let’s assume that you want to change this default behavior. In particular, let’s handle the case where there are multiple add requests for the same edge a bit differently. As opposed to just adding the edge to the vertex, you would really like to count the number of requests that exist and set this count as the value of the edge. This essentially gives you an alternative way to count the number of users from one country that follow users from another country. In Listing 8-6, let’s look at how you can do this simply by creating a custom VertexResolver.

#1 First, check if there are edge addition requests.

#2 If there are edge additions, and the vertex does not exist already, then create it.

#3 For every edge addition, check if the edge already exists.

#4 If it does not exist, add it and set its value to 1.

#5 If it already exists, just increment its value by 1.

In this example, you assume that you only need to handle edge addition requests. The custom resolver first checks whether there are any such additions (#1). If yes, then you need to check whether the source vertex already exists. Recall that in this case, you create vertices representing countries, which do not already exist. Therefore, if the vertex does not already exist, you need to create it (#2). After this, whenever you see a request for the same edge, you simply increment the value of the edge (#5).

Now that you have implemented your custom resolver, the only thing that is missing is to tell Giraph to use this particular implementation of the vertex resolver in place of the default one. As always, you do this through the familiar command-line custom arguments. You just need to add the following to your command line: -ca giraph.vertexResolverClass=MyVertexResolver.

Overall, mutations are one of the advanced features that can prove very useful once you start thinking about more sophisticated applications. Next, another advanced feature is discussed: how to write your own aggregators.

The Aggregator API

Aggregators are a very useful and easy to use tool that allows you to compute global statistics across the entire graph. You have already seen a handful of examples that use aggregators in Chapters 3 and 5. Giraph already provides a set of common aggregator functions (like sum or max) that you can use in your applications. At the same time, Giraph allows you to implement your own aggregator functions. This section explains the aggregator interface in more detail and shows how to write new ones.

Giraph provides two ways for you to implement custom aggregators. The first way is to implement the Aggregator interface, which gives the most flexibility about how you implement an aggregator. Listing 8-7 shows the interface methods.

#1 Aggregates the input value to the current value of the aggregator.

#2 Creates the initial value of the aggregator before any aggregation occurs.

#3 Returns the current value of the aggregator.

#4 Sets the current value of the aggregator.

#5 Resets the value of the aggregator.

To understand the implementation of an aggregator, let’s first discuss how Giraph uses this interface. The createInitialValue() is called by Giraph before it starts aggregating any values. Giraph calls the aggregate method whenever a vertex wants to add a value to an aggregator. Your implementation of the aggregator is responsible for maintaining the appropriate data structure for this aggregation to happen. For instance, if you are implementing a sum aggregator, your implementation should maintain a partial sum to which you are adding values.

Apart from the Aggregator interface, Giraph also provides an abstract class that implements part of the Aggregator interface and covers the most common aggregator functionality. Most of the time, extending this abstract class covers your needs. Let’s look at the details of the BasicAggregator and then an example that extends it to implement a new aggregator.(See Listing 8-8.)

#1 The internal value of the aggregator. It holds the current partial aggregate.

#2 The default constructor sets the internal value to its initial value.

#3 Returns the current value of the aggregator.

#4 Sets the interval value of the aggregator directly.

#5 Resets the value of the aggregator to its initial value.

The only data structure that the BasicAggregator maintains is an internal value of the same type that your aggregate is. This is essentially the current partial value of the aggregator. When you extend the BasicAggregator, your only responsibility is to define what the initial value is through the createInitialValue() method and also how to aggregate a new value to the current internal value of the aggregator through the aggregate() method. Let’s look at this with an example. You have already seen the logic of writing a max or a sum aggregator; here you are shown how to write an aggregator that implements a boolean OR function. You may use such an aggregator to detect whether any of the vertices of the graph meets a condition. For instance, imagine a social graph where each vertex is labeled with the age of the user. You also assume that sometimes the age information may be missing, in which case you stop the analysis and print a message. So, you really want know whether any of the vertices—at least one—is missing the age information. A boolean OR aggregator would implement this as “at-least one” logic. In other words, the value of the aggregator should be the boolean value true if there is at least one vertex that added the value true to the aggregator, and false otherwise. Listing 8-9 shows the implementation.

#1 Initialize the value of the aggregator to false.

#2 Get the current value of the aggregator.

#3 Perform a logical OR operation between the current aggregator value and the aggregated value.

#4 Set the result of the operation as the new value of the aggregator.

THE COMMUTATIVE AND ASSOCIATIVE PROPERTY OF THE OR OPERATION

In Chapter 3 you saw that for Giraph to be able to perform aggregations in an efficient way, the operation must be commutative and associative. It is easy to see that the logical OR operator (||) is indeed both commutative since:

a || b = b || a

and associative since:

(a || b) || c = a || (b || c)

The implementation is quite straightforward. Obviously, if no vertex adds a value to the aggregator, the result should be false. Therefore, the createInitialValue() initializes the value of the aggregator to the boolean value false using the Writable implementation of the boolean type. Next, through the aggregate method, you essentially perform a logical OR operation between the current aggregator value (#2) and the newly aggregated value (#3). The result replaces the value of the aggregator (#4).

Giraph already provides a rich set of aggregator implementations for the most common operations. As always, the more customized and sophisticated your applications become, the more you want to go beyond the basic functionality and implement your own. This API offers a simple way to implement useful aggregators that Giraph can compute for you in a scalable and efficient manner, without you having to worry what happens under the hood. In the next section, you come across aggregators again, this time looking at how they can help with the coordination of a distributed program.

Centralized Algorithm Coordination

So far you learned that the basic principle of programming in the Giraph API was the “vertex-centric” programming, which required you to think from the perspective of a vertex in the graph. Accordingly, you had to write a compute function that each vertex executes. This leads to algorithms that are naturally distributed, where each vertex does not need to know about the state of the rest of the graph, and only computes for itself.

But in some cases, you find that to implement your algorithm, you need the vertices to coordinate among themselves. This coordination may involve sharing some kind information that is common across the entire graph or by performing some centralized computation that affects the entire algorithm. In fact, you already saw such an example in Chapter 5, where you had to compute the most popular user in Twitter through the use of aggregators. An aggregator offers a form of centralized coordination and computation, since all vertices collectively contribute to the value of a single aggregator, which is then made available back to all vertices. Here, we talk about the MasterCompute, another way to coordinate vertices.

In general, apart from the distributed computations that you execute through the familiar compute() function on a per-vertex basis, you may also need to take some action that depends on or affects the graph and the computation as a whole. As an example, consider that halting your application depends on the value of an aggregator, and not on each vertex individually. Fortunately, Giraph offers the MasterCompute, a mechanism designed exactly for this. You have already seen some examples of how the MasterCompute can be of use, but here it is explained in more detail and you are given more examples of what you can achieve with it.

The MasterCompute provides essentially a centralized location where you can perform actions, such as reading and setting aggregator values, getting statistics about the entire graph, or even stopping the entire job execution. In practice, you can think of the MasterCompute as a piece of application logic that executes only once after the end of each superstep. (See Figure 8-4.)

9781484212523_Fig08-04.jpg

Figure 8-4. The MasterCompute centralized point of computation. The compute() method executes once before each superstep. Aggregator values are passed from the MasterCompute to the vertices

Listing 8-10 shows the methods of the MasterCompute abstract class, which are explained shortly, with an example.

#1 Called before execution starts.

#2 Called exactly once for the entire graph after each superstep finishes.

The initialize() method of the MasterCompute is called once before the execution of the algorithm starts; it is used typically to set up your MasterCompute implementation. As you will see in a bit, it is very useful to set up the operation of aggregators. The compute() method is the workhorse of the MasterCompute. Giraph calls this method exactly once before the execution of every superstep; that is, before it calls the compute() method on the vertices. More specifically, the first time Giraph calls the compute() method of the MasterCompute is right before the execution of superstep 0. Next, let’s look at a few examples of what you can do within the compute function of the MasterCompute.

Halting the Computation

Deciding when to stop the execution is one of the core aspects of a distributed graph algorithm. Think about the PageRank algorithm that you saw in Chapter 4. The PageRank algorithm changes the rank of a web page in each iteration. The decision to stop may depend on the number of iterations that you have performed so far, or on how much the web page ranking has changed since the last iteration. In any case, you need a way to instruct Giraph to stop the computation.

Until now, you have learned that in order to stop a Giraph computation, two conditions must occur: every vertex called the voteToHalt() method and no messages were sent in the current superstep. These conditions depend entirely on your vertex computation. In fact, in Chapter 4 you saw many examples of algorithms and how they implement this halting condition. But you may need to make a decision about halting the computation based on some aggregate information about your graph and your algorithm execution, which does not depend on individual vertices. For example, you may want such a decision to depend on the value of an aggregator. In the case of PageRank, you may want to stop if the aggregate difference of the rankings has not changed much since the last iteration; in other words, PageRank has converged. This type of decision cannot be made by each vertex individually, but it must be done by some centralized entity. As you already have guessed, the MasterCompute is the place to do so.

Giraph gives you an extra way to terminate the computation at a centralized point of control through the MasterCompute API. Recall that the compute() method of the MasterCompute class is called after the end of the superstep and before the beginning of the next. From within the compute() method of the MasterCompute, you can call the haltComputation() method, which terminates the execution. Note that once this method is called, the execution terminates no matter if there are still messages sent by vertices or if not all vertices have called the voteToHalt() method as you knew so far. In other words, a call to haltComputation() from the MasterCompute preempts the usual termination condition.

Let’s look at an example of how you could use it. A typical way to decide the termination of an algorithm is based on the number of supersteps executed so far. The PageRank algorithm that you saw in Chapter 4 is such an example. There you put the logic for halting inside the vertex computation itself; here you see how you can do this from inside MasterCompute. One benefit you get from this is that you keep your main computation logic cleaner. Listing 8-11 shows how to do this from inside the MasterCompute implementation.

#1 Terminate computation based on the superstep number.

In general, you can implement arbitrary logic inside the compute() method of the MasterCompute. This logic executes in between supersteps. In this example, you simply check whether you are in superstep 10 and take appropriate action by halting the computation. Otherwise, the compute() method does not need to do something else.

Recall that when you call getSuperstep() from within MasterCompute, it returns the number of the superstep that it is about to execute. In the previous example, the compute() method calls haltComputation() right after the execution of superstep 9 and before the execution of superstep 10. Therefore, superstep 10 will never execute. This is a small detail that you need to be aware of to avoid programming your termination condition in the wrong way.

Using Aggregators for Coordination

You have already seen how useful aggregators are for computing global statistics. Apart from global statistics, there are other useful things you can use aggregators for, and coordination is one of them. But let’s discuss a more concrete example where coordination is necessary. Think about the recommendation algorithms you saw in Chapter 4, used to predict recommendations between users and items. These algorithms iteratively refine a machine learning model that tries to minimize the prediction error. The prediction error is calculated as the aggregate error across all vertices and is computed with the use of an aggregator. These algorithms typically stop execution when the aggregate prediction error has dropped below a specific threshold. One way you can use the aggregators in conjunction with the MasterCompute is to coordinate the halting of the execution.

More specifically, you can think of an aggregator as a global variable that all vertices can write and read. You can use aggregators to collect information from the vertices to a centralized location, which is the MasterCompute, and from there pass this information back to all their vertices during the execution of the algorithms and have them make decisions according to this information. In this example, you are passing the value of the aggregate error to the vertices, and vertices take actions based on this value. In other words, the aggregators are the way for the master worker to communicate with the vertices. More specifically, the values of the aggregators are broadcast to the workers before vertex compute() is called, and collected by the master before master compute() is called. Listing 8-12 shows this example.

#1 Termination is based on the aggregator prediction error falling below a threshold.

#2 When this condition holds, terminate the execution.

In this example, you assume that vertices use the “error” aggregator to sum the total prediction error. After each superstep, the aggregate error is made available to the MasterCompute, and you can read it from within the compute() method to decide the termination of the execution.

As mentioned, apart from reading the value of an aggregator, inside the compute() method of the MasterCompute you can also set the value of an aggregator and “broadcast” its value to every vertex in the graph. The value of the aggregator is available to every vertex during the next superstep execution. For instance, imagine that in the previous example, after each superstep, you want to compute the average value of the error across all vertices and then communicate it back to the vertices. The average error then uses each vertex as feedback to improve the prediction.

One way to do this is to first compute the sum of the prediction error across all vertices. You have already seen how to do this using the “error” aggregator. After this, from within the MasterCompute, you can easily compute the average by dividing the aggregator error with the number of vertices in the graph, and then broadcast this to all vertices. Listing 8-13 illustrates this process.

#1 Get the aggregator error across all vertices.

#2 Compute the average error by dividing by the number of vertices.

#3 Broadcast the average error to all vertices through a new aggregator.

In this MasterCompute implementation, you first read the value of the “error” aggregator that holds the sum of the prediction errors across all vertices (#1). After this, using the getTotalNumVertices() method, you can easily compute the average error (#2). Finally, using the setAggregatedValue() method of the MasterCompute, you set the value of the “avg.error” aggregator to the average value you just computed (#3). The value of this aggregator is available to all vertices in the next superstep.

In general, the MasterCompute is a very useful tool for coordination among vertices. In this case, it has allowed functionality that was not possible by using only the distributed vertex-centric programming interface. In particular, it allowed you to collect communication among the vertices by aggregating information and making it available to the whole graph. In the following section, you find out about another distinct way to use the MasterCompute—that is, how to compose complex algorithms from simpler blocks.

Writing Modular Applications

So far you have explored those facilities of the Giraph API that allow you to add more functionality in your application or write more efficient algorithms. In this section, you look at a feature of the Giraph API that helps you write better applications from a programming perspective. Specifically, you will look at how Giraph allows you to compose potentially complex algorithms from simpler logical blocks, improving code readability and code reuse. Through examples, you learn how to recognize algorithms that can potentially be decomposed to simpler ones and how to use the Giraph API to simplify their development.

Structuring an Algorithm into Phases

Let’s start by discussing the type of algorithms that fall under this category and that could benefit from composability. The programming patterns that make their appearance in many algorithms and are easy to recognize. Note, though, that this is by no means an exhaustive list. These are only meant as common cases for you to understand the composable API and then be more creative with it as new scenarios appear.

One common programing pattern that you may have observed in the Giraph algorithms you have seen so far is that the logic inside your algorithm may depend on the superstep that is executing. This is the basic pattern of graph algorithms in Giraph that you use to break down complex algorithms into simpler ones. Let’s revisit one of the algorithms you saw in Chapter 3 that converts a graph from a directed one to an undirected one. In other words, this algorithm ensures that if an edge from vertex A to vertex B exists in the original graph, then the output graph contains an edge from B to A as well. Several algorithms operate on undirected graphs, so this is a common preprocessing step. In Chapter 3, you saw the pseudocode for this algorithm; here you are shown the actual code. (See Listing 8-14.)

#1 Send this vertex’s ID to all its neighbors.

#2 I got a message with an ID from a vertex that points to me.

#3 If there is no edge to that destination ID, then add it.

In this algorithm, during superstep 0, every vertex sends a message that contains its own vertex ID to all the vertices that it has edges to. This way, during superstep 1, a vertex A receives a message from vertex B if B has an edge to A. If vertex A does not have an edge to B, it can then add it. Figure 8-5 illustrates these two steps.

9781484212523_Fig08-05.jpg

Figure 8-5. The steps to convert a directed graph to an undirected one

This is a very representative example of algorithms that benefit from the ability to compose an algorithm from simpler blocks for the following reasons. First, you can immediately observe that it fits the pattern that described earlier; the computation logic depends on the superstep that is executing. In particular, there are two clearly separated phases: one that is executed during superstep 0, and one that is executed during superstep 1.

Second, converting a graph to an undirected one is usually a preprocessing step that is secondary to the main logic of your application. This way, along with your main logic, you would have to include this conversion logic inside your computation function. For instance, assume that you want to run the Connected Components algorithm that you saw in Chapter 4 on an undirected graph. In this case, before you execute the main logic of the algorithm, you have to ensure that the graph is converted to an undirected one by applying the conversion logic.

To illustrate this, let’s first look at how you would implement the original connected components algorithm. Chapter 4 described this algorithm with pseudocode; Listing 8-15 shows the actual code.

#1 In superstep 0, every vertex sends its ID to all its neighbors.

#2 Each vertex maintains the maximum ID seen so far, which represents the ID of the component the vertex belongs too.

#3 In each subsequent superstep, a vertex receives the IDs of all its neighbors.

#4 A vertex finds the maximum ID among its own and the IDs of its neighbors.

#5 If the max ID is greater than the one stored currently in the vertex value, update the value and propagate the new max ID to all the neighbors.

Now that you have the logic for the connected components, let’s look at how to modify it to perform the same algorithm, but after converting a directed graph to an undirected one (see Listing 8-16).

#1 Graph conversion logic is applied in the first two supersteps.

#2 Main algorithm logic is applied in subsequent supersteps.

You can clearly see that this algorithm mixes the two logics: the graph conversion and the connected components algorithm. Again, it uses the superstep number to distinguish between phases. It uses the first two supersteps to convert the graph, and the supersteps afterward to apply the logic of the connected components.

Even though this is a perfectly correct way to structure such an algorithm, you may find that it can lead to unnecessarily complicated code, as it pollutes your computation with code that is not directly related to the core algorithm logic. If your algorithm becomes more complex, you end up placing many different logical blocks inside the same computation. For instance, just like you have a graph preprocessing step here, in some scenarios you may also have a post-processing phase that has to be executed in the supersteps following the main application logic, adding to the complexity of your application. To make matters worse, when you write unit tests for your algorithm, now you have to account for the testing cases that exercise the conversion logic as well, even though they are not directly related to your main logic.

Third, it is obvious that this piece of logic, converting a graph to an undirected one, may repeat itself across different algorithms that must operate on undirected graphs. In fact, you could just copy the logic executed during supersteps 0 and 1 to different algorithms. But this is not a good programming practice for different reasons. For instance, it forces you to modify your main algorithm so that it applies the main logic only for superstep 2 and after. Apart from this, it makes code reuse and code maintenance harder. For instance, if you happen to find a bug in the conversion logic, you have to fix it in all the algorithms that use this logic. Instead, once you start building a library of algorithms, it would be nice if you could just reuse such common blocks of logic and not having to code them every time. Next, you look at how to actually get past all of these problems.

The Composable API

Hopefully, by now you are convinced that writing algorithms can get complicated from a programming perspective. Let’s now look at how Giraph can make this task easier for you. At a high level, Giraph gives you the ability to specify different computation classes for different supersteps and decide which computation executes in what superstep in an easy way. This way, you get to separate the logic into different computation classes, but at the same time you can combine them into the execution of a single algorithm. Let’s look at an example of how you can do this, followed by an explanation of all the benefits that it gives you.

Specifically, let’s use the example of the graph conversion into undirected. As a first step, break down the logic for the graph conversion into two distinct computations. The first one is responsible for sending the ID of the vertex to all of its neighbors. This is the operation that you used to perform in superstep 0 in your original, monolithic application. The second one is responsible for receiving IDs as messages and adding the corresponding edges if they do not exist. This is the operation that you used to perform superstep 1 in the original algorithm. You can see these two computations in Listing 8-17.

#1 Send vertex ID to all neighbors.

In this example, you assume that your input graph has a vertex ID of type integer, and has no associated vertex value and edge value. Therefore, you assume those to be of type NullWritable. In this first computation, each vertex simply sends its ID to all its neighbors using the sendMessageToAllEdges() API and then halts. Let’s look at Listing 8-18.

At this point, you have two different computations that do not seem related whatsoever. That is because so far you have been thinking about algorithms in the context of a single computation class. In a moment, you are going to change this, and discover how you can glue them together in a very easy manner.

Putting together different computation classes consists mainly of deciding which computation class is used at which superstep; in other words, coordinating the use of the computation classes. The word coordination must have already hinted to you that you are going to do this through the MasterCompute. Indeed, one of the operations you can do in a MasterCompute implementation is setting the computation to be used at each superstep. You apply the logic of choosing the right function inside the compute() method of the MasterCompute. Recall that Giraph calls the compute method of your MasterCompute implementation right before the execution of a superstep. Inside this method, you have the chance to set the computation used in the next superstep. Listing 8-19 shows an example of how to do this.

#1 Get the number of the superstep to be executed next.

#2 If we are about to execute superstep 0, we set the computation to PropagateId.

#3 If we are about to execute superstep 1, we set the computation to ReverseEdges.

Let’s take a step-by-step look at the operations in this method. You first get the number of the superstep that Giraph is about to execute. Based on this, you choose the right computation. In particular, if Giraph is about to execute superstep 0, you want to perform the first phase of the conversion, which is having each vertex send its IDs to its neighbors. This is the logic that the PropagateId computation class implements. You then use the setComputation() API of the MasterCompute to indicate this to Giraph. Notice that you pass as an argument to this method an object of type Class that represents the specific computation class you want to execute. After the first superstep is executed, Giraph calls the compute() method of MasterCompute again. This time, you want to perform the second phase of the conversion, where vertices receive messages from phase one, and add the corresponding edges. This is computation class ReverseEdges. Therefore, if you are about to execute superstep 1, you want to set the computation class to ReverseEdges.class, using again the setComputation() method.

That is it. With the MasterCompute, you have easily coordinated the execution of the algorithm through a simple call to the setComputation() method. As you already have seen in Chapter 5, all you have to do to run this application is to specify the MasterCompute implementation when you run the Giraph job through the –mc command-line option. Giraph takes care of the rest, ensuring that the right computation is used for each superstep.

But let’s look back to see what you have achieved with this separation of the functionality in different blocks. First, it is obvious that you did not have to encode the choice of the superstep in your application logic, leaving it clean and intact. From a code readability point of view, the two separated computations encode only what they intend to do and are much simpler and easier to understand.

Apart from this, it was previously said that it would be nice to be able to easily reuse code across the application. In particular, let’s now revisit the scenario where you want to execute the connected components algorithm on an undirected graph. You already have code that converts a graph to an undirected one, and you already have code that implements the Connected Components algorithm. All you are going to do is use the composable API to glue them together in the MasterCompute with no modification in the main logic. The implementation of the MasterCompute class is shown in Listing 8-20.

As you may have observed, what you have done is moved the coordination and the selection of the logic to execute at each superstep from the main computation to a part of your code that is specifically intended for coordination—that is, the MasterCompute. Again, as a result, you did not have to modify the logic of the Connected Components algorithm at all. You just took three individual pieces of logic and put them together to form a new application. You only had to instruct Giraph to use the graph conversion logic for the first two supersteps, and then your main computation for all the subsequent supersteps. In a similar way, instead of the connected components algorithm, you could use these first two computations in combination with any other algorithm, simplifying code reuse and making it easy to build new applications.

Image Note  While inside MasterCompute, you can set the computation class at every superstep; if you do not set it explicitly, Giraph uses the value set during the last superstep.

Summary

Even though the basic Giraph API is flexible enough to allow you to express a wide range of algorithms, Giraph provides tools that give you more capabilities. These make it possible to express new programming patterns, such as centralized coordination, making your algorithms more efficient, and possibly making your life easier from a programming perspective.

  • The graph mutation API allows an algorithm to modify the graph during execution. This is often a natural requirement by many applications that must change the graph, but may also come as a handy tool in algorithms that require the temporary creation of logical vertices.
  • Giraph provides three ways to perform mutations: (i) direct mutations on a Vertex object, (ii) mutation requests, and (iii) mutations through messages.
  • When using mutation requests, conflicts may occur. Use a VertexResolver to determine how you want the conflicts to be resolved.
  • Aggregators allow you to compute global statistics, and Giraph already provides a rich set of common operations. At the same time, you can use the simple aggregator API to implement custom ones.
  • Aggregators and the MasterCompute allow the expression of algorithms that require centralized coordination.
  • Use the MasterCompute compute() method to implement your centralized coordination logic.
  • Algorithms can quickly get complicated, but they are usually structured in phases that depend on the superstep executed. Using the composable API can simplify programming, enable code reuse, and result in code that is easier to maintain.

At this point, you have already started using the more advanced features of Giraph. This should allow you to write more sophisticated applications. In the following chapter, you continue to look at these advanced features of Giraph, including how to take control of the parallelization in Giraph to write more efficient applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset