Graph representation in GraphX

Recall that a property graph is, for us, a directed multigraph with loops that have custom data objects for both vertices and edges. The central entry point of GraphX is the Graph API, which has the following signature:

class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}

So, internally, a graph in GraphX is represented by one RDD encoding for vertices and one for edges. Here, VD is the vertex data type, and ED is the edge data type of our property graph. We will discuss both VertexRDD and EdgeRDD in more detail, as they are so essential for what follows.

In Spark GraphX, vertices have unique identifiers of the Long type, which are called VertexId. A VertexRDD[VD] is, in fact, just an extension of RDD[(VertexId, VD)], but optimized and with an extensive list of utility functionality that we will talk about at length. Thus, vertices in GraphX, simply put, are RDDs with identifiers and vertex data, which goes hand in hand with the intuition developed earlier.

To explain the concept of EdgeRDD, let's quickly explain what Edge is in GraphX. In a simplified form, Edge is defined by the following signature:

case class Edge[ED] (
var srcId: VertexId,
var dstId: VertexId,
var attr: ED
)

So, an edge is completely determined by a source vertex ID, given by srcId, a target or destination vertex ID, provided as dstId, and an attribute object, attr, of the ED data type. Similar to the preceding vertex RDDs, we can understand EdgeRDD[ED] as an extension of RDD[Edge[ED]]. Thus, edges in GraphX are given by an RDD of edges of the ED type, which again lines up with what we discussed so far.

We now know that as of Spark 2.1, graphs in GraphX are essentially pairs of vertex and edge RDDs. This is important information, as it allows us, in principle, to apply the full functionality and power of RDDs from Spark core to these graphs. As a word of warning, though, graphs come with a lot of functionality that is optimized for the purpose of graph processing. Whenever you find yourself using basic RDD functionality, see if you can find a specific graph equivalent, which will likely be more performant.

To give a concrete example, let's construct a graph from scratch, using what we just learned. We assume that you have a Spark context available as sc. We will create a graph with people connected to each other, namely the one from Figure 3 of the previous section, that is, a labelled graph. In the GraphX language we just acquired, to create such a graph, we need both vertex and edge data types to be of the String type. We do this by using parallelize to create vertices as follows:

import org.apache.spark.rdd.RDD
val
vertices: RDD[(VertexId, String)] = sc.parallelize(
Array((1L, "Anne"),
(2L, "Bernie"),
(3L, "Chris"),
(4L, "Don"),
(5L, "Edgar")))

In the same way, we can create edges; note the use of Edge in the following definition:

val edges: RDD[Edge[String]] = sc.parallelize(
Array(Edge(1L, 2L, "likes"),
Edge(2L, 3L, "trusts"),
Edge(3L, 4L, "believes"),
Edge(4L, 5L, "worships"),
Edge(1L, 3L, "loves"),
Edge(4L, 1L, "dislikes")))

Having these two RDDs ready is already sufficient to create Graph, which is as simple as the following line:

val friendGraph: Graph[String, String] = Graph(vertices, edges)

Note that we explicitly write out the types for all variables, which is just for clarity. We could just leave them out and rely on the Scala compiler to infer them for us. Furthermore, as indicated by the preceding signature, we can access vertices with friendGraph.vertices and edges with friendGraph.edges. Just to give a first glimpse of what is possible, we can now collect all the vertices and print them as follows:

friendGraph.vertices.collect.foreach(println)

The following is the output:

Note that this does not use any GraphX-specific functionality, just what we already know from RDDs. As another example, let's count all the edges for which the source ID is larger than the target ID. This could be done as follows:

friendGraph.edges.map( e => e.srcId > e.dstId ).filter(_ == true).count

This gives back the expected answer, that is, 1, but has a drawback. Once we call .edges on the graph, we completely lose all the graph structure that we previously had. Assuming that we want to further process a graph with transformed edges, this is not the way to go. In such a case, it is better to use the built-in Graph functionality instead, like the following mapEdges method:

val mappedEdgeGraph: Graph[String, Boolean] = 
friendGraph.mapEdges( e => e.srcId > e.dstId )

Note that the return value in this case is again a graph, but the edge data type is now Boolean, as expected. We will see many more examples of graph processing possibilities in just a bit. Having seen this example, let's take a step back and discuss why Spark GraphX implements graphs as it does. One reason is that we can effectively leverage both data parallelism and graph parallelism. In the previous chapters, we already encountered how RDDs and data frames in Spark exploit data parallelism by distributing data across partitions by keeping data in memory on each node. So, if we are only concerned about vertices or edges on their own and don't want to study their relationship, working with the vertex and edge RDDs will be very efficient.

In contrast, by graph parallelism we mean operations carried out in parallel relative to notions of the graph. For instance, a graph-parallel task will be to sum the weights of all the inbound edges for each vertex. To carry out this task, we need to work with both the vertex and edge data, which involves multiple RDDs. Doing this efficiently needs a suitable internal representation. GraphX tries to strike a balance between both the paradigms, which few other alternative programs offer.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset