Developing a real-time ML pipeline from streaming

According to the API guidelines provided by Spark at http://spark.apache.org/docs/latest/streaming-programming-guide.html, technically, Spark Streaming receives live input data streams as objects (objects could be Java/Python/R objects). Later on, the streams are divided into batches, which are then processed by the Spark engine to generate the final input stream in batches. To make this process even easier, Spark Streaming provides a high-level abstraction, which is also called a discretized stream or DStream.

The DStream represents a continuous stream of data coming from real-time streaming sources such as Twitter, Kafka, Fume, Kinesis, Sensors, or any other sources. Discretized streams can be created from these sources, alternatively, high-level operations on other DStreams can also be applied for doing that. Internally, a DStream is represented as a sequence of RDDs, that means the RDD abstraction has been reused for processing the stream of RDDs.

As already discussed in Chapter 6, Building Large Scale Machine Learning Pipelines, a topic modeling technique automatically infers the topics discussed and inherently places them in a collection of documents as hidden resources. This is commonly used in the Natural Language Processing (NLP) and text mining tasks. These topics can be used to analyze, summarize, and organize those documents. Alternatively, those topics can be used for featurization and dimensionality reduction in later stages of a machine learning (ML) pipeline development. The most popular topic modeling algorithms are Latent Dirichlet Allocation (LDA) and Probabilistic Latent Semantic Analysis (pLSA). Previously we discussed how to apply the LDA algorithm for the static dataset that is already available. However, if the topic modeling is prepared from the real-time streaming data, that would be great and would be more live in knowing the trends in social media such as Twitter, LinkedIn, or Facebook.

However, due to the limited API facility by Facebook or LinkedIn, it would be difficult to collect the real-time data from those social media platforms. Spark also provides the API for accessing data from Twitter, Kafka, Flume, and Kinesis. The workflow of a near real-time ML application development from streaming data should follow the workflows as presented in Figure 9:

Developing a real-time ML pipeline from streaming

Figure 9: Real-time predictive ML model development from streaming data using Spark.

In this section, we will show you how to develop a real-time ML pipeline that handles streaming data. More specifically, we will show a step-by-step topic modeling from Twitter streaming data. The topic modeling here has two steps: Twitter data collection and topic modeling using LDA.

Real-time tweet data collection from Twitter

Spark provides APIs to access real-time tweets from the Twitter timeline. The tweets can be further made by using keywords or hashtags. Alternatively, tweets can also be downloaded from someone's Twitter timeline. However, before accessing the tweets data, you will have to create a sample Twitter application on Twitter and generate four keys: consumerKey, consumerSecret, accessToken, and accessTokenSecret.

Tip

Note that Twitter (and some other) driver support has been removed during Spark upgrade from 1.6.2 to 2.0.0.  This means streaming data collection support using Spark from less used streaming connectors, including Twitter, Akka, MQTT, and ZeroMQ has been removed in Spark 2.0.0. Therefore, it i€™s not possible to develop an application for Twitter data collection using Spark 2.0.0. Consequently, Spark 1.6.1 will be used for the demonstration for Twitter data collection in this section. Readers are suggested to create a Maven project on Eclipse using the provided Maven friendly pom.xml file. 

After authenticating your Spark ML application to collect data from Twitter, you will have to define the JavaStreamingContext by specifying the SparkConf and duration for collecting tweets. After that, tweets data can be downloaded as DStream or discrete stream through the TwitterUtils API of Spark once you revoke the start() method using the JavaStreamingContext object.

Upon starting to receive the tweets, you can save the tweets data on your local machine or HDFS or any other filesystem where applicable. However, the streaming will be continued until you terminate the streaming using awaitTermination().

The received tweets, however, can be also be pre-processed or cleaned by using the foreachRDD design pattern and then can be saved to your desired location. Due to page limitation, we have limited our discussion here.

Tip

Moreover, interested readers should follow the API guidelines for Spark Streaming in the following web page of Spark:http://spark.apache.org/docs/latest/streaming-programming-guide.html.

Tweet collection using TwitterUtils API of Spark

In this sub-section, at first, we will show you how to collect real-time tweets data from Twitter using the TwitterUtils API. Then the same tweets data will be used for topic modeling in the next sub-section.

Step 1: Load required packages and APIs

Here is the code to load the required packages:

import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.twitter.TwitterUtils; 
import twitter4j.Status;  

Step 2: Setting the Logger level

For setting the Logger level, we use the following code:

Logger.getLogger("org").setLevel(Level.OFF); 
Logger.getLogger("akka").setLevel(Level.OFF); 
Logger.getLogger("org.apache.spark").setLevel(Level.WARN); 
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);  

Step 3: Spark streaming environment setting

Here is the codes for Spark streaming illustrated:

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("TwitterExample"); 
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); 

Step 4: Setting the authentication for accessing Twitter data

Get the authentications values from the sample Twitter application by visiting the following URL: https://apps.twitter.com/:

String consumerKey = "VQINrM6ZcNqaCAawA6IN4xRTP"; 
String consumerSecret = "F2OsVEuJypOZSAoNDFrWgoCHyNJNXbTr8T3yEbp9cWEYjTctye"; 
String accessToken = "475468363-IfRcZnbkEVPRw6bwXovMnw1FsbxuetvEF2JvbAvD"; 
String accessTokenSecret = "vU7VtzZVyugUHO7ddeTvucu1wRrCZqFTPJUW8VAe6xgyf"; 

Note that here we have provided the same values for these four secret keys. Replace these values with your own keys accordingly. Well, now we need to set the system property using twitter4j.oauth for the previous four keys:

System.setProperty("twitter4j.oauth.consumerKey", consumerKey); 
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret); 
System.setProperty("twitter4j.oauth.accessToken", accessToken); 
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret); 

Step 5: Enable the check pointing

The following code shows how to enable the check pointing:

jssc.checkpoint("src/main/resources/twitterdata/"); 

The metadata check pointing is primarily needed for recovery from driver failures, whereas data or RDD check pointing is necessary even for basic functioning if stateful transformations are used. For more details, please visit the following web page of Spark: http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing.

Step 6: Start accepting stream of tweets as a discrete stream

Let's collect only 100 tweets for simplicity and but can collect as much as you want:

JavaDStream<Status> tweets = TwitterUtils.createStream(jssc); 
final String outputDirectory="src/main/resources/twitterdata/"; 
final long numTweetsToCollect = 100; 

Step 7: Filter tweets and save as regular text files

The code for filter tweets is shown here:

tweets.foreachRDD(new Function<JavaRDD<Status>, Void>() { 
      public long numTweetsCollected = 0; 
      @Override 
      public Void call(JavaRDD<Status> status) throws Exception {         
        long count = status.count(); 
        if (count > 0) { 
          status.saveAsTextFile(outputDirectory + "/tweets_" + System.currentTimeMillis()); 
          numTweetsCollected += count; 
          if (numTweetsCollected >= numTweetsToCollect) { 
               System.exit(0); 
          } 
        } 
        return null; 
      } 
    }); 

Here we are pre-processing the tweets using the singleton method, foreachRDD, which accepts only filtered tweets, that is, if the status count is at least 1. When the number of collected tweets is equal or more than the number of tweets to be collected, then we exit the collection. Finally, we save the tweets as texts in the output directory.

Step 8: Controlling the streaming switch

The code for controlling the streaming switch is shown here:

jssc.start(); 
jssc.awaitTermination();  

Eventually, we will use these texts of tweets for the topic modeling in the next step. If you recall the topic modeling in Chapter 6, Building Scalable Machine Learning Pipelines, we saw the corresponding term weight, topic name, and term indices. However, we also need to have the actual terms. In the next step, we will show the detailed technique of retrieving the terms extensively dependent on the vocabulary that needs to be created for that.

Topic modeling using Spark

In this sub-section, we represented a semi-automated technique of topic modeling using Spark. The following steps show the topic modeling from data reading to printing the topics along with their term-weights.

Step 1: Load necessary packages and APIs

Here is the code to load the necessary packages:

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.Map; 
import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFlatMapFunction; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.ml.feature.StopWordsRemover; 
import org.apache.spark.mllib.clustering.LDA; 
import org.apache.spark.mllib.clustering.LDAModel; 
import org.apache.spark.mllib.linalg.Vector; 
import org.apache.spark.mllib.linalg.Vectors; 
import org.apache.spark.sql.SQLContext; 
import scala.Tuple2; 

Step 2: Configure the Spark environment

Here is the code to configure the Spark:

private transient static SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("TopicModelingLDA"); 
private transient static JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
private transient static SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc); 

Step 3: Setting the logging level

Here is the code to set the logging level:

Logger.getLogger("org").setLevel(Level.OFF); 
Logger.getLogger("akka").setLevel(Level.OFF); 
Logger.getLogger("org.apache.spark").setLevel(Level.WARN); 
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF); 

Note

Note that setting the logging level that was just previously shown is optional.

Step 4: Create Java RDD and cache them in-memory

Create Java RDD and cache them for the tweets data from the previous step:

JavaRDD<String> data = jsc.wholeTextFiles("src/main/resources/test/*.txt") 
        .map(new Function<Tuple2<String, String>, String>() { 
          @Override 
          public String call(Tuple2<String, String> v1) throws Exception {     
            return v1._2; 
          } 
        }).cache(); 

Step 5: Tokenize the terms

Load the list of stop-words provided by Spark and tokenize the terms by filtering them by applying three constraints: text length at least 4, not a stop word, and making them each lower case. Note that we discussed stop-words in Chapter 6, Building Scalable Machine Learning Pipelines:

public static String[] stopwords = new StopWordsRemover().getStopWords(); 
JavaRDD<String[]> tokenized = data.map(new Function<String, String[]>() { 
list.toArray(new String[0]); 
      }      @Override 
      public String[] call(String v1) throws Exception { 
        ArrayList<String> list = new ArrayList<>(); 
        for (String s : v1.split("\s")) { 
          if (s.length() > 3 && !isStopWord(s) && isOnlyLetter(s)) 
            list.add(s.toLowerCase()); 
        } 
        return 
    }); 

Step 6: Prepare the term counts

Prepare the term counts by filtering them by applying four constraints: text length at least 4, not stop words, selecting only characters, and making them all lower case:

JavaPairRDD<String, Integer> termCounts = data 
        .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { 
          @Override 
          public Iterable<Tuple2<String, Integer>> call(String t) throws Exception {     
            ArrayList<Tuple2<String, Integer>> tc = new ArrayList<>();   
            for (String s : t.split("\s")) { 
 
              if (s.length() > 3 && !isStopWord(s) && isOnlyLetter(s)) 
                tc.add(new Tuple2<String, Integer>(s.toLowerCase(), 1)); 
            } 
            return tc; 
          } 
        }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
          @Override 
          public Integer call(Integer v1, Integer v2) throws Exception {     
            return v1 + v2; 
          } 
        }); 

Note that here isStopWords() and isOnlyLetters() are two user defined methods that will be discussed at the end of this step.

Step 7: Sort the term counts

Sort the terms counts by applying two transformations, sortByKey() and mapToPair():

JavaPairRDD<String, Integer> termCountsSorted = termCounts 
        .mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { 
          @Override 
          public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { 
            return t.swap(); 
          } 
        }).sortByKey().mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { 
          @Override 
          public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { 
            return t.swap(); 
          } 
        }); 

Step 8: Create the vocabulary

Create a vocabulary RDD by mapping the sorted term counts. Finally, print the key value pairs:

JavaRDD<String> vocabArray = termCountsSorted.map(new Function<Tuple2<String, Integer>, String>() { 
      @Override 
      public String call(Tuple2<String, Integer> v1) throws Exception { 
        return v1._1; 
      } 
    }); 
final Map<String, Long> vocab = vocabArray.zipWithIndex().collectAsMap(); 
    for (Map.Entry<String, Long> entry : vocab.entrySet()) { 
      System.out.println(entry.getKey() + "/" + entry.getValue()); 
    } 

Let's see a screenshot of the vocabulary terms and their indices shown in Figure 10:

Topic modeling using Spark

Figure 10: Vocabulary terms and their indices.

Step 9: Create the document matrix from the tokenized words/terms

Create the document matrix as JavaPairRDD from the tokenized terms by mapping the vocabulary we created in the previous step. After that, cache the RDD in-memory for faster processing:

JavaPairRDD<Long, Vector> documents = JavaPairRDD 
        .fromJavaRDD(tokenized.zipWithIndex().map(new Function<Tuple2<String[], Long>, Tuple2<Long, Vector>>() { 
          @Override 
          public Tuple2<Long, Vector> call(Tuple2<String[], Long> v1) throws Exception { 
            String[] tokens = v1._1; 
            Map<Integer, Double> counts = new HashMap(); 
 
            for (String s : tokens) { 
              if (vocab.containsKey(s)) { 
                long idx = vocab.get(s); 
                int a = (int) idx; 
                if (counts.containsKey(a)) { 
                  counts.put(a, counts.get(a) + 1.0); 
                } else 
                  counts.put(a, 0.0); 
              } 
            } 
            ArrayList<Tuple2<Integer, Double>> ll = new ArrayList<>(); 
            ArrayList<Double> dd = new ArrayList<>(); 
 
            for (Map.Entry<Integer, Double> entry : counts.entrySet()) { 
              ll.add(new Tuple2<Integer, Double>(entry.getKey(), entry.getValue())); 
              dd.add(entry.getValue()); 
            } 
 
            return new Tuple2<Long, Vector>(v1._2, Vectors.sparse(vocab.size(), ll)); 
          } 
        })).cache(); 

Step 10: Train the LDA model

Train the LDA model using the documents matrix from step 9 and describe 10 topic terms against four topics for simplicity.

Note that here we have used Latent Dirichilet Allocation (LDA), which is one of the most popular topic modeling algorithms commonly used for text mining. We could use more robust topic modeling algorithms such as Probabilistic Latent Sentiment Analysis (pLSA), Pachinko Allocation Model (PAM), or Hierarchical Dirichilet Process (HDP) algorithms. However, pLSA has the overfitting problem.

On the other hand, both HDP and PAM are more complex topic modeling algorithms used for complex text mining such as mining topics from high dimensional text data or documents of unstructured text. Moreover, to this date, Spark has implemented only one topic modeling algorithm, that is LDA. Therefore, we have to use LDA reasonably:

LDAModel ldaModel = new LDA().setK(4).setMaxIterations(10).run(documents); 
Tuple2<int[], double[]>[] topicDesces = ldaModel.describeTopics(10); 
int topicCount = topicDesces.length; 

Note that to keep the topic generation simple, we have set the number of the topic as 4 and have iterated the LDA 10 times. Another reason is that in the next section we want to show how to connect these four topics through their common terms. Readers are recommended to change the value based on their requirements.

Step 11: Get the topic terms, index, term weights, and total sum across each topic

Get these statistics from the vocabulary and topic description described in step 10 and step 8:

for (int t = 0; t < topicCount; t++) { 
      Tuple2<int[], double[]> topic = topicDesces[t]; 
      System.out.println("      Topic: " + t); 
      int[] indices = topic._1(); 
      double[] values = topic._2(); 
      double sum = 0.0d; 
      int wordCount = indices.length; 
      System.out.println("Terms |	Index |	Weight"); 
      System.out.println("------------------------"); 
      for (int w = 0; w < wordCount; w++) { 
        double prob = values[w]; 
        int vocabIndex = indices[w]; 
        String vocabKey = ""; 
        for (Map.Entry<String, Long> entry : vocab.entrySet()) { 
          if (entry.getValue() == vocabIndex) { 
            vocabKey = entry.getKey(); 
            break; 
          } } 
System.out.format("%s 	 %d 	 %f 
", vocabKey, vocabIndex, prob); 
        sum += prob; 
      } 
      System.out.println("--------------------"); 
      System.out.println("Sum:= " + sum); 
      System.out.println(); 
    }  } 

If you look at the preceding code segment carefully, the vocabKey indicates the corresponding topic term, vocabIndex is the index, and prob indicates the term weight for each term in a topic. The print statements have been used to format the outputs. Now let's see the output that describes four topics for simplicity in Figure 11:

Topic modeling using Spark

Figure 11: Describing four topics.

As we mentioned in step 6, here we will show how we develop the isStopWord() method. Just use the following code:

public static boolean isStopWord(String word) { 
    for (String s : stopwords) { 
      if (word.equals(s))   
        return true; 
    } 
    return false; 
  } 

And the isOnlyLetters() method goes as follows:

public static boolean isOnlyLetter(String word) { 
    for (Character ch : word.toCharArray()) { 
      if (!Character.isLetter(ch)) 
        return false; 
    } 
    return true; 
  } 

In the next section, we will introduce how to parse and handle large-scale graph data using GraphX API of Spark to find the connected components from the topics data that we got from Figure 10.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset