GraphX

While graph for Scala may be considered a DSL for graph operations and querying, one should go to GraphX for scalability. GraphX is build on top of a powerful Spark framework. As an example of Spark/GraphX operations, I'll use the CMU Enron e-mail dataset (about 2 GB). The actual semantic analysis of the e-mail content is not going to be important to us until the next chapters. The dataset can be downloaded from the CMU site. It has e-mail from mailboxes of 150 users, primarily Enron managers, and about 517,401 e-mails between them. The e-mails may be considered as an indication of a relation (edge) between two people: Each email is an edge between a source (From:) and a destination (To:) vertices.

Since GraphX requires the data in RDD format, I'll have to do some preprocessing. Luckily, it is extremely easy with Scala—this is why Scala is the perfect language for semi-structured data. Here is the code:

package org.akozlov.chapter07

import scala.io.Source

import scala.util.hashing.{MurmurHash3 => Hash}
import scala.util.matching.Regex

import java.util.{Date => javaDateTime}

import java.io.File
import net.liftweb.json._
import Extraction._
import Serialization.{read, write}

object EnronEmail {

  val emailRe = """[a-zA-Z0-9_.+-][email protected]""".r.unanchored

  def emails(s: String) = {
    for (email <- emailRe findAllIn s) yield email
  }

  def hash(s: String) = {
    java.lang.Integer.MAX_VALUE.toLong + Hash.stringHash(s)
  }

  val messageRe =
    """(?:Message-ID:s+)(<[A-Za-z0-9_.+-@]+>)(?s)(?:.*?)(?m)
      |(?:Date:s+)(.*?)$(?:.*?)
      |(?:From:s+)([a-zA-Z0-9_.+-][email protected])(?:.*?)
      |(?:Subject: )(.*?)$""".stripMargin.r.unanchored

  case class Relation(from: String, fromId: Long, to: String, toId: Long, source: String, messageId: String, date: javaDateTime, subject: String)

  implicit val formats = Serialization.formats(NoTypeHints)

  def getFileTree(f: File): Stream[File] =
    f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree) else Stream.empty)

  def main(args: Array[String]) {
    getFileTree(new File(args(0))).par.map {
      file => {
        "\.$".r findFirstIn file.getName match {
          case Some(x) =>
          try {
            val src = Source.fromFile(file, "us-ascii")
            val message = try src.mkString finally src.close()
            message match {
              case messageRe(messageId, date, from , subject) =>
              val fromLower = from.toLowerCase
              for (to <- emails(message).filter(_ != fromLower).toList.distinct)
              println(write(Relation(fromLower, hash(fromLower), to, hash(to), file.toString, messageId, new javaDateTime(date), subject)))
                case _ =>
            }
          } catch {
            case e: Exception => System.err.println(e)
          }
          case _ =>
        }
      }
    }
  }
}

First, we use the MurmurHash3 class to generate node IDs, which are of type Long, as they are required for each node in GraphX. The emailRe and messageRe are used to match the file content to find the required content. Scala allows you to parallelize the programs without much work.

Note the par call on line 50, getFileTree(new File(args(0))).par.map. This will make the loop parallel. If processing the whole Enron dataset can take up to an hour even on 3 GHz processor, adding parallelization reduces it by about 8 minutes on a 32-core Intel Xeon E5-2630 2.4 GHz CPU Linux machine (it took 15 minutes on an Apple MacBook Pro with 2.3 GHz Intel Core i7).

Running the code will produce a set of JSON records that can be loaded into Spark (to run it, you'll need to put joda-time and lift-json library jars on the classpath), as follows:

# (mkdir Enron; cd Enron; wget -O - http://www.cs.cmu.edu/~./enron/enron_mail_20150507.tgz | tar xzvf -)
...
# sbt --error "run-main org.akozlov.chapter07.EnronEmail Enron/maildir" > graph.json

# spark --driver-memory 2g --executor-memory 2g
...
scala> val df = sqlContext.read.json("graph.json")
df: org.apache.spark.sql.DataFrame = [[date: string, from: string, fromId: bigint, messageId: string, source: string, subject: string, to: string, toId: bigint]

Nice! Spark was able to figure out the fields and types on it's own. If Spark was not able to parse all the records, one would have a _corrupt_record field containing the unparsed records (one of them is the [success] line at the end of the dataset, which can be filtered out with a grep -Fv [success]). You can see them with the following command:

scala> df.select("_corrupt_record").collect.foreach(println)
...

The nodes (people) and edges (relations) datasets can be extracted with the following commands:

scala> import org.apache.spark._
...
scala> import org.apache.spark.graphx._
...
scala> import org.apache.spark.rdd.RDD
...
scala> val people: RDD[(VertexId, String)] = df.select("fromId", "from").unionAll(df.select("toId", "to")).na.drop.distinct.map( x => (x.get(0).toString.toLong, x.get(1).toString))
people: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[146] at map at <console>:28

scala> val relationships = df.select("fromId", "toId", "messageId", "subject").na.drop.distinct.map( x => Edge(x.get(0).toString.toLong, x.get(1).toString.toLong, (x.get(2).toString, x.get(3).toString)))
relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[(String, String)]] = MapPartitionsRDD[156] at map at <console>:28

scala> val graph = Graph(people, relationships).cache
graph: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@7b59aa7b

Note

Node IDs in GraphX

As we saw in Graph for Scala, specifying the edges is sufficient for defining the nodes and the graph. In Spark/GraphX, nodes need to be extracted explicitly, and each node needs to be associated with n id of the Long type. While this potentially limits the flexibility and the number of unique nodes, it enhances the efficiency. In this particular example, generating node ID as a hash of the e-mail string was sufficient as no collisions were detected, but the generation of unique IDs is usually a hard problem to parallelize.

The first GraphX graph is ready!! It took a bit more work than Scala for Graph, but now it's totally ready for distributed processing. A few things to note: first, we needed to explicitly convert the fields to Long and String as the Edge constructor needed help in figuring out the types. Second, Spark might need to optimize the number of partitions (likely, it created too many):

scala> graph.vertices.getNumPartitions
res1: Int = 200

scala> graph.edges.getNumPartitions
res2: Int = 200

To repartition, there are two calls: repartition and coalesce. The latter tries to avoid shuffle, as follows:

scala> val graph = Graph(people.coalesce(6), relationships.coalesce(6))
graph: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@5dc7d016

scala> graph.vertices.getNumPartitions
res10: Int = 6

scala> graph.edges.getNumPartitions
res11: Int = 6

However, this might limit parallelism if one performs computations over a large cluster. Finally, it's a good idea to use cache method that pins the data structure in memory:

scala> graph.cache
res12: org.apache.spark.graphx.Graph[String,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@5dc7d016

It took a few more commands to construct a graph in Spark, but four is not too bad. Let's compute some statistics (and show the power of Spark/GraphX, in the following table:

Computing basic statistics on Enron e-mail graph.

Statistics

Spark command

Value for Enron

Total # of relations (pairwise communications)

graph.numEdges

3,035,021

Number of e-mails (message IDs)

graph.edges.map( e => e.attr._1 ).distinct.count

371,135

Number of connected pairs

graph.edges.flatMap( e => List((e.srcId, e.dstId), (e.dstId, e.srcId))).distinct.count / 2

217,867

Number of one-way communications

graph.edges.flatMap( e => List((e.srcId, e.dstId), (e.dstId, e.srcId))).distinct.count - graph.edges.map( e => (e.srcId, e.dstId)).distinct.count

193,183

Number of distinct subject lines

graph.edges.map( e => e.attr._2 ).distinct.count

110,273

Total # of nodes

graph.numVertices

23,607

Number of destination-only nodes

graph. numVertices - graph.edges.map( e => e.srcId).distinct.count

17,264

Number of source-only nodes

graph. numVertices - graph.edges.map( e => e.dstId).distinct.count

611

Who is getting e-mails?

One of the most straightforward ways to estimate people's importance in an organization is to look at the number of connections or the number of incoming and outgoing communicates. The GraphX graph has built-in inDegrees and outDegrees methods. To rank the emails with respect to the number of incoming emails, run:

scala> people.join(graph.inDegrees).sortBy(_._2._2, ascending=false).take(10).foreach(println)
(268746271,([email protected],18523))
(1608171805,([email protected],15867))
(1578042212,([email protected],13878))
(960683221,([email protected],13717))
(3784547591,([email protected],12980))
(1403062842,([email protected],12082))
(2319161027,([email protected],12018))
(969899621,([email protected],10777))
(1362498694,([email protected],10296))
(4151996958,([email protected],10160))

To rank the emails with respect to the number of egressing emails, run:

scala> people.join(graph.outDegrees).sortBy(_._2._2, ascending=false).take(10).foreach(println)
(1578042212,([email protected],139786))
(2822677534,([email protected],106442))
(3035779314,([email protected],94666))
(2346362132,([email protected],90570))
(861605621,([email protected],74319))
(14078526,([email protected],58797))
(2058972224,([email protected],58718))
(871077839,([email protected],57559))
(3852770211,([email protected],50106))
(241175230,([email protected],40425))

Let's apply some more complex algorithms to the Enron dataset.

Connected components

Connected components determine whether the graph is naturally partitioned into several parts. In the Enron relationship graph, this would mean that two or several groups communicate mostly between each other:

scala> val groups = org.apache.spark.graphx.lib.ConnectedComponents.run(graph).vertices.map(_._2).distinct.cache
groups: org.apache.spark.rdd.RDD[org.apache.spark.graphx.VertexId] = MapPartitionsRDD[2404] at distinct at <console>:34

scala> groups.count
res106: Long = 18

scala> people.join(groups.map( x => (x, x))).map(x => (x._1, x._2._1)).sortBy(_._1).collect.foreach(println)
(332133,[email protected])
(81833994,[email protected])
(115247730,[email protected])
(299810291,[email protected])
(718200627,[email protected])
(847455579,[email protected])
(919241773,[email protected])
(1139366119,[email protected])
(1156539970,[email protected])
(1265773423,[email protected])
(1493879606,[email protected])
(1511379835,[email protected])
(2114016426,[email protected])
(2200225669,[email protected])
(2914568776,[email protected])
(2934799198,[email protected])
(2975592118,[email protected])
(3678996795,[email protected])

We see 18 groups. Each one of the groups can be counted and extracted by filtering the ID. For instance, the group associated with can be found by running a SQL query on DataFrame:

scala> df.filter("fromId = 919241773 or toId = 919241773").select("date","from","to","subject","source").collect.foreach(println)
[2000-09-19T18:40:00.000Z,[email protected],[email protected],NO ACTION REQUIRED - TEST,Enron/maildir/dasovich-j/all_documents/1567.]
[2000-09-19T18:40:00.000Z,[email protected],[email protected],NO ACTION REQUIRED - TEST,Enron/maildir/dasovich-j/notes_inbox/504.]

This group is based on a single e-mail sent on September 19, 2000, from to . The e-mail is listed twice, only because it ended up in two different folders (and has two distinct message IDs). Only the first group, the largest subgraph, contains more than two e-mail addresses in the organization.

Triangle counting

The triangle counting algorithm is relatively straightforward and can be computed in the following three steps:

  1. Compute the set of neighbors for each vertex.
  2. For each edge, compute the intersection of the sets and send the count to both vertices.
  3. Compute the sum at each vertex and divide by two, as each triangle is counted twice.

We need to convert the multigraph to an undirected graph with srcId < dstId, which is a precondition for the algorithm:

scala> val unedges = graph.edges.map(e => if (e.srcId < e.dstId) (e.srcId, e.dstId) else (e.dstId, e.srcId)).map( x => Edge(x._1, x._2, 1)).cache
unedges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[87] at map at <console>:48

scala> val ungraph = Graph(people, unedges).partitionBy(org.apache.spark.graphx.PartitionStrategy.EdgePartition1D, 10).cache
ungraph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@77274fff

scala> val triangles = org.apache.spark.graphx.lib.TriangleCount.run(ungraph).cache
triangles: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6aec6da1

scala> people.join(triangles.vertices).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println)
(31761,[email protected])
(24101,[email protected])
(23522,[email protected])
(21694,[email protected])
(20847,[email protected])
(18460,[email protected])
(17951,[email protected])
(16929,[email protected])
(16390,[email protected])
(16197,[email protected])

While there is no direct relationship between the triangle count and the importance of people in the organization, the people with higher triangle count arguably are more social—even though a clique or a strongly connected component count might be a better measure.

Strongly connected components

In the mathematical theory of directed graphs, a subgraph is said to be strongly connected if every vertex is reachable from every other vertex. It could happen that the whole graph is just one strongly connected component, but on the other end of the spectrum, each vertex could be its own connected component.

If you contract each connected component to a single vertex, you get a new directed graph that has a property to be without cycles—acyclic.

The algorithm for SCC detection is already built into GraphX:

scala> val components = org.apache.spark.graphx.lib.StronglyConnectedComponents.run(graph, 100).cache
components: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,(String, String)] = org.apache.spark.graphx.impl.GraphImpl@55913bc7

scala> components.vertices.map(_._2).distinct.count
res2: Long = 17980

scala> people.join(components.vertices.map(_._2).distinct.map( x => (x, x))).map(x => (x._1, x._2._1)).sortBy(_._1).collect.foreach(println)
(332133,[email protected])                                              
(466265,[email protected])
(471258,[email protected])
(497810,[email protected])
(507806,[email protected])
(639614,[email protected])
(896860,[email protected])
(1196652,[email protected])
(1240743,[email protected])
(1480469,[email protected])
(1818533,[email protected])
(2337461,[email protected])
(2918577,[email protected])

There are 18,200 strongly connected components with only an average 23,787/18,200 = 1.3 users per group.

PageRank

The PageRank algorithm gives us an estimate of how important a person by analysing the links, which are the emails in this case. For example, let's run PageRank on Enron email graph:

scala> val ranks = graph.pageRank(0.001).vertices
ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[955] at RDD at VertexRDD.scala:57

scala> people.join(ranks).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println)

scala> val ranks = graph.pageRank(0.001).vertices
ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[955] at RDD at VertexRDD.scala:57

scala> people.join(ranks).map(t => (t._2._2,t._2._1)).sortBy(_._1, ascending=false).take(10).foreach(println)
(32.073722548483325,[email protected])
(29.086568868043248,[email protected])
(28.14656912897315,[email protected])
(26.57894933459292,[email protected])
(25.865486865014493,[email protected])
(23.86746232662471,[email protected])
(22.489814482022275,[email protected])
(21.968039409295585,[email protected])
(20.903053536275547,[email protected])
(20.39124651779771,[email protected])

Ostensibly, these are the go-to people. PageRank tends to emphasize the incoming edges, and Tana Jones returns to the top of the list compared to the 9th place in the triangle counting.

SVD++

SVD++ is a recommendation engine algorithm, developed specifically for Netflix competition by Yahuda Koren and team in 2008—the original paper is still out there in the public domain and can be Googled as kdd08koren.pdf. The specific implementation comes from the .NET MyMediaLite library by ZenoGarther (https://github.com/zenogantner/MyMediaLite), who granted Apache 2 license to the Apache Foundation. Let's assume I have a set of users (on the left) and items (on the right):

SVD++

Figure 07-1. A graphical representation of a recommendation problem as a bipartite graph.

The preceding diagram is a graphical representation of the recommendation problem. The nodes on the left represent users. The nodes on the right represent items. User 1 recommends items A and C, while users 2 and 3 recommend only a single item A. The rest of the edges are missing. The common question is to find recommendation ranking of the rest of the items, the edges may also have a weight or recommendation strength attached to them. The graph is usually sparse. Such graph is also often called bipartite, as the edges only go from one set of nodes to another set of nodes (the user does not recommend another user).

For the recommendation engine, we typically need two types of nodes—users and items. The recommendations are based on the rating matrix of (user, item, and rating) tuples. One of the implementation of the recommendation algorithm is based on Singular Value Decomposition (SVD) of the preceding matrix. The final scoring has four components: the baseline, which is the sum of average for the whole matrix, average for the users, and average for the items, as follows:

SVD++

Here, the SVD++, SVD++, and SVD++ can be understood as the averages for the whole population, user (among all user recommendations), and item (among all the users). The final part is the Cartesian product of two rows:

SVD++

The problem is posed as a minimization problem (refer to Chapter 4, Supervised and Unsupervised Learning):

SVD++

Here, SVD++ is a regularization coefficient also discussed in Chapter 4, Supervised and Unsupervised Learning. So, each user is associated with a set of numbers (SVD++, and each item with SVD++, SVD++. In this particlar implementation, the optimal coefficients are found by gradient descent. This is the basic of SVD optimization. In linear algebra, SVD takes an arbitrary SVD++ matrix A and represents it as a product of an orthogonal SVD++ matrix U, a diagonal SVD++ matrix SVD++, and a SVD++ unitary matrix V, for example, the columns are mutually orthogonal. Arguably, if one takes the largest SVD++ entries of the SVD++ matrix, the product is reduced to the product of a very tall SVD++ matrix and a wide SVD++ matric, where SVD++ is called the rank of decomposition. If the remaining values are small, the new SVD++ numbers approximate the original SVD++ numbers for the relation, A. If m and n are large to start with, and in practical online shopping situations, m is the items and can be in hundreds of thousands, and n is the users and can be hundreds of millions, the saving can be substantial. For example, for r=10, m=100,000, and n=100,000,000, the savings are as follows:

SVD++

SVD can also be viewed as PCA for matrices with SVD++. In the Enron case, we can treat senders as users and recipients as items (we'll need to reassign the node IDs), as follows:

scala> val rgraph = graph.partitionBy(org.apache.spark.graphx.PartitionStrategy.EdgePartition1D, 10).mapEdges(e => 1).groupEdges(_+_).cache
rgraph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@2c1a48d6

scala> val redges = rgraph.edges.map( e => Edge(-e.srcId, e.dstId, Math.log(e.attr.toDouble)) ).cache
redges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[57] at map at <console>:36

scala> import org.apache.spark.graphx.lib.SVDPlusPlus
import org.apache.spark.graphx.lib.SVDPlusPlus

scala> implicit val conf = new SVDPlusPlus.Conf(10, 50, 0.0, 10.0, 0.007, 0.007, 0.005, 0.015)
conf: org.apache.spark.graphx.lib.SVDPlusPlus.Conf = org.apache.spark.graphx.lib.SVDPlusPlus$Conf@15cdc117

scala> val (svd, mu) = SVDPlusPlus.run(redges, conf)
svd: org.apache.spark.graphx.Graph[(Array[Double], Array[Double], Double, Double),Double] = org.apache.spark.graphx.impl.GraphImpl@3050363d
mu: Double = 1.3773578970633769

scala> val svdRanks = svd.vertices.filter(_._1 > 0).map(x => (x._2._3, x._1))
svdRanks: org.apache.spark.rdd.RDD[(Double, org.apache.spark.graphx.VertexId)] = MapPartitionsRDD[1517] at map at <console>:31

scala> val svdRanks = svd.vertices.filter(_._1 > 0).map(x => (x._1, x._2._3))
svdRanks: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[1520] at map at <console>:31

scala> people.join(svdRanks).sortBy(_._2._2, ascending=false).map(x => (x._2._2, x._2._1)).take(10).foreach(println)
(8.864218804309887,[email protected])
(5.935146713012661,[email protected])
(5.740242927715701,[email protected])
(5.441934324464593,[email protected])
(4.910272928389445,[email protected])
(4.701529779800544,[email protected])
(4.4046392452058045,[email protected])
(4.374738019256556,[email protected])
(4.303078586979311,[email protected])
(3.8295412053860867,[email protected])

The svdRanks is the user-part of the SVD++ prediction. The distribution lists take a priority as this is usually used for mass e-mailing. To get the user-specific part, we need to provide the user ID:

scala> import com.github.fommil.netlib.BLAS.{getInstance => blas}

scala> def topN(uid: Long, num: Int) = {
     |    val usr = svd.vertices.filter(uid == -_._1).collect()(0)._2
     |    val recs = svd.vertices.filter(_._1 > 0).map( v => (v._1, mu + usr._3 + v._2._3 + blas.ddot(usr._2.length, v._2._1, 1, usr._2, 1)))
     |    people.join(recs).sortBy(_._2._2, ascending=false).map(x => (x._2._2, x._2._1)).take(num)
     | }
topN: (uid: Long, num: Int)Array[(Double, String)]

scala> def top5(x: Long) : Array[(Double, String)] = topN(x, 5)
top5: (x: Long)Array[(Double, String)]

scala> people.join(graph.inDegrees).sortBy(_._2._2, ascending=false).map(x => (x._1, x._2._1)).take(10).toList.map(t => (t._2, top5(t._1).toList)).foreach(println)
([email protected],List((4.866184418005094E66,[email protected]), (3.9246829664352734E66,[email protected]), (3.9246829664352734E66,[email protected]), (3.871029763863491E66,[email protected]), (3.743135924382312E66,[email protected])))
([email protected],List((2.445163626935533E66,[email protected]), (1.9584692804232504E66,[email protected]), (1.9105427465629028E66,[email protected]), (1.9105427465629028E66,[email protected]), (1.8931872324048717E66,[email protected])))
([email protected],List((2.8924566115596135E66,[email protected]), (2.3157345904446663E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2385865127706285E66,[email protected])))
([email protected],List((6.1758464471309754E66,[email protected]), (5.279291610047078E66,[email protected]), (4.967589820856654E66,[email protected]), (4.909283344915057E66,[email protected]), (4.869177440115682E66,[email protected])))
([email protected],List((5.7702834706832735E66,[email protected]), (4.703038082326939E66,[email protected]), (4.703038082326939E66,[email protected]), (4.579565962089777E66,[email protected]), (4.4298763869135494E66,[email protected])))
([email protected],List((9.198688613290757E67,[email protected]), (8.078107057848099E67,[email protected]), (6.922806078209984E67,[email protected]), (6.787266892881456E67,[email protected]), (6.420473603137515E67,[email protected])))
([email protected],List((1.302856119148208E66,[email protected]), (1.0678968544568682E66,[email protected]), (1.031255083546722E66,[email protected]), (1.009319696608474E66,[email protected]), (9.901391892701356E65,[email protected])))
([email protected],List((9.770393472845669E65,[email protected]), (7.97370292724488E65,[email protected]), (7.97370292724488E65,[email protected]), (7.751983820970696E65,[email protected]), (7.500175024539423E65,[email protected])))
([email protected],List((6.856103529420811E65,[email protected]), (5.611272903720188E65,[email protected]), (5.611272903720188E65,[email protected]), (5.436280144720843E65,[email protected]), (5.2621103015001885E65,[email protected])))
([email protected],List((5.0579114162531735E65,[email protected]), (4.136838933824579E65,[email protected]), (4.136838933824579E65,[email protected]), (4.0110663808847004E65,[email protected]), (3.8821438267917902E65,[email protected])))

scala> people.join(graph.outDegrees).sortBy(_._2._2, ascending=false).map(x => (x._1, x._2._1)).take(10).toList.map(t => (t._2, top5(t._1).toList)).foreach(println)
([email protected],List((2.8924566115596135E66,[email protected]), (2.3157345904446663E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2646318970030287E66,[email protected]), (2.2385865127706285E66,[email protected])))
([email protected],List((3.135142195254243E65,[email protected]), (3.135142195254243E65,[email protected]), (2.773512892785554E65,[email protected]), (2.350799070225962E65,[email protected]), (2.2055288158758267E65,[email protected])))
([email protected],List((5.773492048248794E66,[email protected]), (5.067434612038159E66,[email protected]), (4.389028076992449E66,[email protected]), (4.1791711984241975E66,[email protected]), (4.009544764149938E66,[email protected])))
([email protected],List((2.834710591578977E68,[email protected]), (2.488253676819922E68,[email protected]), (2.1516048969715738E68,[email protected]), (2.0405329247770104E68,[email protected]), (1.9877213034021861E68,[email protected])))
([email protected],List((3.453167402163105E64,[email protected]), (3.208849221485621E64,[email protected]), (3.208849221485621E64,[email protected]), (3.0374270093157086E64,[email protected]), (2.886581252384442E64,[email protected])))
([email protected],List((5.1729089729525785E66,[email protected]), (4.220843848723133E66,[email protected]), (4.220843848723133E66,[email protected]), (4.1044435240204605E66,[email protected]), (3.9709951893268635E66,[email protected])))
([email protected],List((2.513139130001457E65,[email protected]), (2.1037756300035247E65,[email protected]), (2.0297519350719265E65,[email protected]), (1.9587139280519927E65,[email protected]), (1.947164483486155E65,[email protected])))
([email protected],List((4.516267307013845E66,[email protected]), (3.653408921875843E66,[email protected]), (3.653408921875843E66,[email protected]), (3.590298037045689E66,[email protected]), (3.471781765250177E66,[email protected])))
([email protected],List((2.0719309635087482E66,[email protected]), (1.732651408857978E66,[email protected]), (1.732651408857978E66,[email protected]), (1.6348480059915056E66,[email protected]), (1.5880693846486309E66,[email protected])))
([email protected],List((5.596589595417286E66,[email protected]), (4.559474243930487E66,[email protected]), (4.559474243930487E66,[email protected]), (4.4421474044331

Here, we computed the top five recommended e-mail-to list for top in-degree and out-degree users.

SVD has only 159 lines of code in Scala and can be the basis for some further improvements. SVD++ includes a part based on implicit user feedback and item similarity information. Finally, the Netflix winning solution had also taken into consideration the fact that user preferences are time-dependent, but this part has not been implemented in GraphX yet.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset