Aggregating messages

First, we discuss the aggregateMessages method that GraphX graphs come with. The basic idea is to pass messages along edges in parallel across the whole graph, aggregate these messages suitably and store the result for further processing. Let's have a closer look at how aggregateMessages is defined:

def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All
): VertexRDD[Msg]

As you can see, to implement an aggregateMessages algorithm we need to specify a message type Msg and provide three functions, which we will explain next. You may notice that there are two additional types that we haven't encountered before, namely EdgeContext and TripletFields. Simply put, an edge context is an extension of EdgeTriplets that we have already seen, that is, an edge plus all information about adjacent vertices, with the only difference being that we can additionally send information to the source and target vertex defined as follows:

def sendToSrc(msg: A): Unit
def sendToDst(msg: A): Unit

TripletFields allows one to restrict the EdgeContext fields used in the computation, which defaults to all available fields. In fact, in what follows we will simply use this default for tripletFields and focus on sendMsg and mergeMsg only. As indicated in the introduction to this topic, sendMsg is used to pass messages along edges, mergeMsg aggregates them and we store the result of this operation in a vertex RDD of Msg type. To make this more concrete, consider the following example, an alternative way to compute in-degree for all vertices for our little friend graph from earlier:

val inDegVertexRdd: VertexRDD[Int] = friendGraph.aggregateMessages[Int](
sendMsg = ec => ec.sendToDst(1),
mergeMsg = (msg1, msg2) => msg1+msg2
)
assert(inDegVertexRdd.collect.deep == friendGraph.inDegrees.collect.deep)

In this example, sending a message is defined by taking an edge context and using its sendToDst method to send an integer message to each target vertex, namely the number one. What this means is that for each edge in parallel we send a one to each vertex this edge points to. This way vertices get send messages that we need to merge. The mergeMsg here should be understood the same way as reduce for RDDs in general, that is, we specify how two messages are merged and this recipe is used to collapse all messages into one. In the example at hand we just sum up all messages, which by definition yields the in-degree for each vertex. We confirm this by asserting equality of the arrays we get from collecting both inDegVertexRdd and friendGraph.inDegrees on master.

Note that the return value of aggregateMessages is a vertex RDD, not a graph. So, using this mechanism iteratively, we need to generate a new graph object in each iteration, which is not ideal. Since Spark is especially strong with iterative algorithms due to keeping partition data in memory and the fact that a lot of interesting graph algorithms are in fact iterative, we next discuss the slightly more complicated, but extremely powerful, Pregel API.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset