Pregel

Pregel is a system internally developed by Google, the companion paper of which is very accessible and available for download at http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf. It represents an efficient, iterative graph-parallel compute model that allows one to implement a large class of graph algorithms. GraphX's implementation of Pregel differs slightly from the preceding paper, but we can't go into any details of this.

In flavor, GraphX's Pregel implementation is very close to aggregateMessages, but has a few key differences. Traits that are shared by both approaches are the send and merge message mechanics. On top of that, with Pregel we can define a so-called vertex program vprog that is executed before sending, to transform vertex data. Also, we start with a shared initial message on each vertex and can specify for how many iterations we want to execute the vprog-send-merge cycle, that is, iterations are part of the specification.

The apply method of the Pregel implementation is sketched. Note that it takes two sets of inputs, namely a quadruple consisting of the graph itself, an initial message, the maximum iterations to be executed and a field called activeDirection. The last argument deserves some more attention. A detail of the Pregel specification we have not talked about yet is that we only send new messages from vertices that have received messages in the previous iteration. The active direction defaults to Either, but can also be both, In or Out. This behavior naturally lets algorithms converge in many cases and it also explains why the third argument is called maxIterations - we might stop earlier than specified:

object Pregel {
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
}

The second set of arguments to Pregel is the triple we already sketched, namely the vertex program, as well as sending and merging messages functions. The only noteworthy difference from before is the signature of sendMsg, which returns an iterator over vertex ID and message pairs. This does not change much for us, but interestingly, the signature of sendMsg in aggregateMessage has been such an iterator until Spark 1.6 and was changed to what we discussed previously in the update to Spark 2.0. Very likely, the signature of Pregel will be changed accordingly as well, but as of 2.1.1 it remains as described.

To illustrate the possibilities of the Pregel API let's sketch an implementation of an algorithm that computes connected components. This is a slight modification of the implementation currently available in GraphX. We define the ConnectedComponents object with a single following method, namely run, which takes any graph and a maximum number of iterations. The core idea of the algorithm is easy enough to explain. For each edge, whenever its source has a smaller ID than its target, send the source ID to the target and vice versa. To aggregate these messages, simply take the minimum of all broadcasted values and iterate this procedure long enough so that it runs out of updates. At this point, every vertex that is connected to another bears the same ID as vertex data, namely the smallest ID available in the original graph:

import org.apache.spark.graphx._
import scala.reflect.ClassTag

object ConnectedComponents extends Serializable {

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
maxIterations: Int)
: Graph[VertexId, ED] = {

val idGraph: Graph[VertexId, ED] = graph.mapVertices((id, _) => id)

def vprog(id: VertexId, attr: VertexId, msg: VertexId): VertexId = {
math.min(attr, msg)
}

def sendMsg(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}

def mergeMsg(v1: VertexId, v2: VertexId): VertexId = math.min(v1, v2)

Pregel(
graph = idGraph,
initialMsg = Long.MaxValue,
maxIterations,
EdgeDirection.Either)(
vprog,
sendMsg,
mergeMsg)
}
}

Going step by step, the algorithm does as follows. We first forget all previously available vertex data by defining idGraph. Next, we define the vertex program to emit the minimum of the current vertex data attribute and the current message. This way we can store the minimum vertex ID as vertex data. The sendMsg method propagates the smaller ID for each edge to either source or target, as described before and mergeMsg again just takes the minimum over IDs. Having these three key methods defined, we can simply run Pregel on the idGraph with maxIterations as specified. Note that we do not care about which direction the messages flow, so we use EdgeDirection.Either. Also, we start with the maximum available Long value as our initial message, which works since we take the minimum over vertex IDs everywhere.

Having defined this allows us to find connected components on the retweet graph rtGraph from earlier as follows, choosing five iterations as maximum:

val ccGraph = ConnectedComponents.run(rtGraph, 5)
cc.vertices.map(_._2).distinct.count

Counting distinct vertex data items of the resulting graph gives us the number of connected components (in this case it is just one component), that is, all tweets in the data set are connected if we forget directionality. It is interesting to note that we do in fact need five iterations for the algorithm to converge. Running it with fewer iterations, that is, 1, 2, 3 or 4, yields 1771, 172, 56 and 4 connected components. Since there has to be at least one connected component, we know that further increasing iterations would not change the outcome. However, in general we would rather not specify the number of iterations, unless time or computing power are an issue. By wrapping the preceding run method as follows, we can run this algorithm on graphs only, without explicitly providing iterations:

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED])
: Graph[VertexId, ED] = {
run(graph, Int.MaxValue)
}

Simply add this as an additional method to the ConnectedComponents object. For the retweet graph, we can now simply write instead. Having seen both aggregateMessages and Pregel, the reader should now be adequately equipped to develop their own graph algorithms:

val ccGraph = ConnectedComponents.run(rtGraph)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset