© Raju Kumar Mishra and Sundar Rajan Raman 2019
Raju Kumar Mishra and Sundar Rajan RamanPySpark SQL Recipeshttps://doi.org/10.1007/978-1-4842-4335-0_9

9. GraphFrames

Raju Kumar Mishra1  and Sundar Rajan Raman2
(1)
Bangalore, Karnataka, India
(2)
Chennai, Tamil Nadu, India
 

GraphFrames are an abstraction of DataFrames that are used to do Graph Analytics. Graph Analytics stems from the mathematical Graph Theory. Graph Theory is a very important theory used to represent relationships between entities, which we can use to perform various analyses. You are using Graph Theory in your everyday life when using Google. Google introduced the PageRank algorithm that is based on Graph Theory. It tries to identify the most influential website that suits your search in the best way.

While Graph Theory is used in various sciences, computer science also tends to solve a lot of problems with Graph Theory. Some of the applications of Graph Theory include social media problems, travel, chip design, and many other fields. In fact, every time you run a Spark job, you are using Graph Theory. Spark uses Directed Acyclic Graphs to represent an RDD. It uses it to find the optimized plan to your query.

Graphs are a combination of vertices that are connected to each other using edges. While vertices can be thought of as nodes or entities, edges represent the relationship between these entities. Figure 9-1 visualizes a graph of a family.
../images/469054_1_En_9_Chapter/469054_1_En_9_Fig1_HTML.png
Figure 9-1

Graphs are a combination of vertices

The diagram in Figure 9-1 represents a small family of four members—the husband (Andrew), the wife (Sierra), a son (Bob), and a daughter (Emily). People are represented as nodes, which are called vertices. Each person is connected to the other. You can observe that just within a four-member family, there are 12 relationships. These relationships are represented using the edges that connect them. Now imagine a social media app, such as LinkedIn, that needs to connect millions of people. There is going to be an enormous number of edges. To be able to apply analytics on this kind of data, regular databases will not suffice. With a regular database, you would need to apply self-joins so many times and applying self-joins will literally bring your database down.

Graph Theory solves complex issues like this. Spark provides GraphFrames to represent this graph data. In this chapter, we will learn how to create GraphFrames and apply some of the most used algorithms to solve complex problems.

In this chapter, we are going to discuss the following recipes:
  • Recipe 9-1. Create GraphFrames

  • Recipe 9-2. Apply triangle counting in a GraphFrame

  • Recipe 9-3. Apply the PageRank algorithm

  • Recipe 9-4. Apply the Breadth First algorithm

Recipe 9-1. Create GraphFrames

Problem

You need to create a GraphFrame from a given dataset.

Solution

This solution uses family relationship data. We are going to use two datasets. Figures 9-2 and 9-3 show the datasets.
../images/469054_1_En_9_Chapter/469054_1_En_9_Fig2_HTML.jpg
Figure 9-2

persons.csv

../images/469054_1_En_9_Chapter/469054_1_En_9_Fig3_HTML.jpg
Figure 9-3

relationship.csv

This is a convenient dataset for everyone to use and understand. Let’s load this dataset.

How It Works

Before starting with GraphFrames, you need to install GraphFrames on your machine.

If you have not installed GraphFrames and you start using it, you will receive the following error message.
>>> from graphframes import *
Here is the output:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'graphframes'
You need to first install GraphFrames using the following command from pyspark.
pyspark --packages graphframes:graphframes:0.6.0-spark2.3-s_2.11
Run this command in your command prompt. PySpark will automatically download the GraphFrame package and install it. Once you install it, you can check it by importing the GraphFrame package, as shown here.
>>> from graphframes import *

If you don’t see the error message anymore, you can use GraphFrame successfully.

Let’s now load the persons.csv and relationship.csv data as DataFrames.
>>> personsDf = spark.read.csv('persons.csv',header=True, inferSchema=True)
>>> personsDf.createOrReplaceTempView("persons")
This command loads the data into the personsDf DataFrame and creates a temporary view of it. Let’s see the content of this view.
>>> spark.sql("select * from persons").show()
Here is the output:
+--+-------+---+
|id|   Name|Age|
+--+-------+---+
| 1| Andrew| 45|
| 2| Sierra| 43|
| 3|    Bob| 12|
| 4|  Emily| 10|
| 5|William| 35|
| 6| Rachel| 32|
+--+-------+---+
Now we load the relationship data as well. Here is the code and the output:
>>> relationshipDf = spark.read.csv('relationship.csv',header=True, inferSchema=True)
>>> relationshipDf.createOrReplaceTempView("relationship")
>>> spark.sql("select * from relationship").show()
+---+---+--------+
|src|dst|relation|
+---+---+--------+
|  1|  2| Husband|
|  1|  3|  Father|
|  1|  4|  Father|
|  1|  5|  Friend|
|  1|  6|  Friend|
|  2|  1|    Wife|
|  2|  3|  Mother|
|  2|  4|  Mother|
|  2|  6|  Friend|
|  3|  1|     Son|
|  3|  2|     Son|
|  4|  1|Daughter|
|  4|  2|Daughter|
|  5|  1|  Friend|
|  6|  1|  Friend|
|  6|  2|  Friend|
+---+---+--------+
Now that both the datasets are available, we are going to import the GraphFrames.
>>>from graphframes import *
>>>
Graphframes has been successfully loaded.
The following command creates your first GraphFrame. The GraphFrame accepts two DataFrames as inputs—vertices and edges. GraphFrames like to have a naming convention in the column name, which you need to follow. Those rules are defined as follows:
  • A DataFrame that represents vertices should contain a column named id. Here, personsDf contains a column name id.

  • A DataFrame that represents edges should contain columns named src and dst. Here, reationshipDf contains the columns src and dst.

>>> graph = GraphFrame(personsDf, relationshipDf)
Let’s try to see the type of the graph variable. Here it is:
>>> graph
GraphFrame(v:[id: int, Name: string ... 1 more field], e:[src: int, dst: int ... 1 more field])

So it is a GraphFrame that contains v and e. The v represents vertices and e represents edges.

Now that you have successfully created a GraphFrame, it is important to understand degrees.

Degrees represent the number of edges that are connected to a vertex. GraphFrame supports inDegrees and outDegrees. inDegrees give you the number of incoming links to a vertex. outDegrees give the number of outgoing edges from a node. It is very important to understand this. Let’s try to see the output for a given example.

Here you are going to find all the edges connected to Andrew.

The following code gets all the links and filters based on a given filter, which is set by id = 1:
>>> graph.degrees.filter("id = 1").show()
Here is the output. Note that there are 10 entries for Andrew (id=1) in the relationship dataset.
+--+------+
|id|degree|
+--+------+
| 1|    10|
+--+------+
The following code gets the number of incoming links to Andrew. This is obtained by calling the inDegrees method.
>>> graph.inDegrees.filter("id = 1").show()
Here is the output. There are five entries for Andrew in the dst column.
+--+--------+
|id|inDegree|
+--+--------+
| 1|       5|
+--+--------+
The following code shows how to get the number of links coming out from Andrew using the outDegrees method .
>>> graph.outDegrees.filter("id = 1").show()
Here is the output. There are five relationship entries for src column value 1.
+--+---------+
|id|outDegree|
+--+---------+
| 1|        5|
+--+---------+

With this, you have successfully created a GraphFrame from the vertices and edges dataset.

Apache Spark provides multiple Graph algorithms built-in. These algorithms are abstracted and provided as easy-to-use APIs. Once you prepare a GraphFrame, any of the following algorithms can be used with just a method.

GraphFrame provides the following built-in algorithms:
  • Connected components

  • Label propagation

  • PageRank

  • SVD++

  • Shortest Path

  • Strongly connected components

  • Triangle count

Recipe 9-2. Apply Triangle Counting in a GraphFrame

Problem

You need to find the triangle count value for each vertex.

Solution

GraphFrames provide an easy-to-use triangleCount API, which upon calling on a given GraphFrame, outputs a DataFrame with a count column added to each of the vertex rows. This count column identifies how many triangle relationships the vertex is participating in. Now we will see how to get the triangle count for each vertex. This is very helpful with route-finding problems and places an important role on the PageRank algorithm.

How It Works

The following code is used to find the triangle count for each vertex.
>>> graph.triangleCount().show()
Here is the output:
+-----+---+-------+---+
|count| id|   Name|Age|
+-----+---+-------+---+
|    3|  1| Andrew| 45|
|    1|  6| Rachel| 32|
|    1|  3|    Bob| 12|
|    0|  5|William| 35|
|    1|  4|  Emily| 10|
|    3|  2| Sierra| 43|
+-----+---+-------+---+

A new column count is added in the output that represents the triangle count. The output shows that Andrew and Sierra have the maximum triangle counts, since they are involved in three kinds of relationships. Andrew as father, friend, and husband and Sierra as mother, friend, and wife. With this, you have successfully created a GraphFrame and applied analytics to it.

You can also register the output of the triangleCount output DataFrame as a table and easily apply a query on that. Let’s use PySparkSQL to identify all the people with a maximum triangle count.

Using the following code, you can find the people in the family with the highest triangle counts.

After that, you can join it with the Persons DataFrame to be able to view the person’s details.

The following code does exactly that. Although you can achieve this result with a programmatic method in one line of code, the following method uses simple SQL commands.
>>> personsTriangleCountDf.createOrReplaceTempView("personsTriangleCount")
>>> maxCountDf = spark.sql("select max(count) as max_count from personsTriangleCount")
>>> maxCountDf.createOrReplaceTempView("personsMaxTriangleCount")
>>> spark.sql("select * from personsTriangleCount P JOIN (select * from personsMaxTriangleCount) M ON (M.max_count = P.count) ").show()
Here is the output:
+-----+---+------+---+---------+
|count| id|  Name|Age|max_count|
+-----+---+------+---+---------+
|    3|  1|Andrew| 45|        3|
|    3|  2|Sierra| 43|        3|
+-----+---+------+---+---------+

Recipe 9-3. Apply a PageRank Algorithm

Problem

You need to apply PageRank algorithms to find the most influential person in this family.

Solution

The PageRank algorithm was the base of Google during its initial period. It was originally started by Google’s founders to identify the most important pages on the Internet. It uses the idea that the most important pages are linked to by other pages most often. Also, it uses the idea that the higher the link to a given page from higher ranked pages, the more important the page. Thus, Google uses linked web pages represented in graph form to identify important pages for us.

The PageRank algorithm measures the importance of each vertex in a graph. Assume a scenario where one Twitter user has 10 important followers, and each of those followers has multiple followers in turn. That Twitter user gets a higher ranking compared to a Twitter user with 50 “normal” followers. This is to say that the PageRank algorithm considers each important follower a legitimate endorsement of the Twitter user and thereby gives a higher ranking to the user.

How It Works

Let’s see the PageRank algorithm in action. Apache Spark makes it very simple to call the PageRank algorithm.
>>> pageRank = graph.pageRank(resetProbability=0.20, maxIter=10)
Here, we are calling the PageRank algorithm using the pageRank method . It takes two attributes:
  • resetProbablity: This value is a random value reset probability (alpha).

  • maxIter : This is the number of times you want pageRank to run.

Let’s look at pageRank. It is a GraphFrame containing the vertices and edges attributes.
>>> pageRank
GraphFrame(v:[id: int, Name: string ... 2 more fields], e:[src: int, dst: int ... 2 more fields])
Let’s look at the vertices attribute :
>>> pageRank.vertices.printSchema()
root
 |-- id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- pagerank: double (nullable = true)

You can see from the original persons schema that a new column has been added called pagerank. This column is added by Spark and indicates the pageRank score for the vertex.

Similarly, let’s look at the edges present in this GraphFrame.
>>> pageRank.edges.printSchema()
root
 |-- src: integer (nullable = true)
 |-- dst: integer (nullable = true)
 |-- relation: string (nullable = true)
 |-- weight: double (nullable = true)

As you can see from the schema, a new column weight has been added to the original relationship schema. This column weight indicates the edge weight that contributed to the PageRank score.

Let’s look at the PageRank score for each vertex and the weight for each of the edges.

We are going to order the PageRank in descending order so that we can see the most connected person in the family based on the links with the other family members.

The following code lists all the vertices in descending order.
>>> pageRank.vertices.orderBy("pagerank",ascending=False).show()
Let’s see the output:
+--+-------+----+------------------+
|id|   Name| Age|          pagerank|
+--+-------+----+------------------+
| 1| Andrew| 45 | 1.787923121897472|
| 2| Sierra| 43 | 1.406016795082752|
| 6| Rachel| 32 |0.7723665979473922|
| 4|  Emily| 10 |0.7723665979473922|
| 3|    Bob| 12 |0.7723665979473922|
| 5|William| 35 |0.4889602891776001|
+--+-------+----+------------------+

You can see from this output that Andrew is the most connected person.

Let’s look at the weight contributed by each of the edges. Here we are listing all the edges in descending order so that the maximum weight is listed first.
>>> pageRank.edges.orderBy("weight",ascending=False).show()
Here is the output. You can identify from the output that the edge 5, 1, Friend gets the maximum weightage. William’s relationship with Andrew gets the maximum weight since it is unique. No one other than Andrew is a friend to William.
+---+---+--------+------+
|src|dst|relation|weight|
+---+---+--------+------+
|  5|  1|  Friend|   1.0|
|  3|  1|     Son|   0.5|
|  4|  1|Daughter|   0.5|
|  4|  2|Daughter|   0.5|
|  6|  1|  Friend|   0.5|
|  3|  2|     Son|   0.5|
|  6|  2|  Friend|   0.5|
|  2|  3|  Mother|  0.25|
|  2|  4|  Mother|  0.25|
|  2|  1|    Wife|  0.25|
|  2|  6|  Friend|  0.25|
|  1|  2| Husband|   0.2|
|  1|  6|  Friend|   0.2|
|  1|  3|  Father|   0.2|
|  1|  4|  Father|   0.2|
|  1|  5|  Friend|   0.2|
+---+---+--------+------+

Now that you understand the PageRank algorithm and have applied it to GraphFrames, let’s move on to the Breadth First algorithm.

Recipe 9-4. Apply the Breadth First Algorithm

Problem

You need to apply the Breadth First algorithm to find the shortest way to connect to a person.

Solution

You might have often noticed LinkedIn telling you how far you are from any new user. For example, you will notice that a user whom you would like to connect to is a second connection or a third connection. This tells you that you are two vertices away from the vertex from where you are looking. This is one way of identifying how far a vertex is to another vertex.

Similarly, in scenarios where flight companies need to identify the shortest path between cities, they need to identify the path with the least number of vertices between airports. Of course, there may be additional conditions, such as time, whether stops are required on the course, etc.

Problems very similar to these exist in every industry. For example, chip-designing companies need to identify the shortest circuitry path; telecom companies need to find the shortest path between routers, and so on.

Breadth First search is one of the shortest path-finding algorithms and it helps us identify the shortest path between two vertices.

We are going to apply this algorithm to the persons dataset to find the shortest path for Bob to connect to William. You can see that only Andrew is connected to William. So, for Bob to be able to connect to William, what is the shortest path? This is what we are going to determine using the following code.

How It Works

GraphFrames provides an API, called bfs, which takes a minimum of two parameters. Those are:
  • fromExpr: Expression to identify the from vertex.

  • toExpr: Expression to identify the to vertex.

Yes, it can be expressions, which means you can do it for multiple vertices. First, we will see it for one vertex. After that, we will apply it to multiple vertices.
>>> graph.bfs(
...   fromExpr = "name = 'Bob'",
...   toExpr = "name = 'William'",
...   ).show()

From this code notice that we are calling the bfs method with two inputs—fromExpr and toExpr—and with filters called = 'Bob' and "name = 'William'". This is to say that we are looking for the shortest path between Bob and William.

Here is the output:
+------------+-----------+---------------+--------------+----------------+
|        from|         e0|             v1|            e1|              to|
+------------+-----------+---------------+--------------+----------------+
|[3, Bob, 12]|[3, 1, Son]|[1, Andrew, 45]|[1, 5, Friend]|[5, William, 35]|
+------------+-----------+---------------+--------------+----------------+

From the previous output, you can infer that for Bob to connect to William, he needs to go through Andrew.

Let’s try to apply more than one vertex to this. We will try to find all the people younger than 20 to be able to connect to Rachel.
>>> graph.bfs(
...   fromExpr = "age < 20",
...   toExpr = "name = 'Rachel'",
...   ).show()

In the previous code snippet, we modified the expressions so that we are looking for all people younger than 20 to find ways to connect to Rachel.

Here is the output:
+--------------+----------------+---------------+--------------+---------------+
|          from|              e0|             v1|            e1|             to|
+--------------+----------------+---------------+--------------+---------------+
|  [3, Bob, 12]|     [3, 1, Son]|[1, Andrew, 45]|[1, 6, Friend]|[6, Rachel, 32]|
|  [3, Bob, 12]|     [3, 2, Son]|[2, Sierra, 43]|[2, 6, Friend]|[6, Rachel, 32]|
|[4, Emily, 10]|[4, 1, Daughter]|[1, Andrew, 45]|[1, 6, Friend]|[6, Rachel, 32]|
|[4, Emily, 10]|[4, 2, Daughter]|[2, Sierra, 43]|[2, 6, Friend]|[6, Rachel, 32]|
+--------------+----------------+---------------+--------------+---------------+

Notice that Bob and Emily are both listed in the output. Since Rachel is a friend to both Andrew and Sierra, they are the vertex between Bob and Emily. Bob and Emily need to go through either Andrew or Sierra to be able to connect to Rachel.

If you want to restrict some of the paths—let’s say you want the kids to only go through the parents—then you can use the edgeFilter to determine through which relationships Bob and Emily can connect to Rachel.

The following code shows the usage of the edgeFilter attribute. We are going to say that only the daughter is allowed in the results.

Here is the code that uses the edgeFilter attribute to filter out the edges while identifying the shortest paths:
>>> graph.bfs(
...   fromExpr = "age < 20",
...   toExpr = "name = 'Rachel'",
...   edgeFilter = "relation != 'Son'"
...   ).show()
Here is the output that shows how Emily can connect to Rachel:
+--------------+----------------+---------------+--------------+---------------+
|          from|              e0|             v1|            e1|             to|
+--------------+----------------+---------------+--------------+---------------+
|[4, Emily, 10]|[4, 1, Daughter]|[1, Andrew, 45]|[1, 6, Friend]|[6, Rachel, 32]|
|[4, Emily, 10]|[4, 2, Daughter]|[2, Sierra, 43]|[2, 6, Friend]|[6, Rachel, 32]|
+--------------+----------------+---------------+--------------+---------------+

With this, you have successfully learned about the Breadth First search algorithm and used it for analysis.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset