Coding our first Spark Streaming job

In this section, we will code and execute our first Spark Streaming job in Scala. We will also simulate the streaming data by creating a temporary stream.

Creating a stream producer

Perform the following steps to create a stream producer which continuously reads the input data provided by the user from the console and then further submits that data to a socket:

  1. Open and edit your Spark-Examples project and create a new Scala package and class named chapter.nine.StreamProducer.java.
  2. Next, edit StreamProducer.java and add the following piece of code:
    import java.net.*;
    import java.io.*;
    
    public class StreamProducer {
    
      public static void main(String[] args) {
    
        if (args == null || args.length < 1) {
          System.out.println("Usage - java chapter.nine.StreamProducer <port#>");
          System.exit(0);
        }
        System.out.println("Defining new Socket on " + args[0]);
    try (ServerSocket soc = new ServerSocket(Integer.parseInt(args[0]))) {
    
    System.out.println("Waiting for Incoming Connection on "
              + args[0]);
          Socket clientSocket = soc.accept();
        System.out.println("Connection Received");
        OutputStream outputStream = clientSocket.getOutputStream();
    // Keep Reading the data in an Infinite loop and send it over to the Socket.
          while (true) {
        PrintWriter out = new PrintWriter(outputStream, true);
            BufferedReader read = new BufferedReader(new InputStreamReader(
                System.in));
            System.out.println("Waiting for user to input some data");
            String data = read.readLine();
    System.out.println("Data received and now writing it to Socket");
        out.println(data);
    
      }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

And we are done with our stream producer. It is pretty simple and straightforward. First, it opens the server socket so that clients can connect and then it infinitely waits for the input from the user. As soon as it receives the input, it immediately sends across the same input to the connected clients. The clients can be any consumer, but in our case it will be our Spark Streaming job that we will create in the next section.

Let's move to the next section where we will create our streaming job in Scala and Java for accepting and transforming the data generated by our stream producer.

Writing our Spark Streaming job in Scala

Perform the following steps for writing our first Spark Streaming job in Scala:

  1. Create a new Scala object in our Spark-Examples project named chapter.nine.ScalaFirstSparkStreamingJob.scala.
  2. Next, edit ScalaFirstSparkStreamingJob.scala and add the following piece of code:
    package chapter.nine
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming._
    import org.apache.spark.storage.StorageLevel._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.dstream.ForEachDStream
    
    
    object ScalaFirstSparkStreamingJob {
      
      def main(args:Array[String]){
        
        println("Creating Spark Configuration")
        //Create an Object of Spark Configuration
        val conf = new SparkConf()
        //Set the logical and user defined Name of this Application
        conf.setAppName("Our First Spark Streaming Application in Scala")
        
        println("Retrieving Streaming Context from Spark Conf")
        //Retrieving Streaming Context from SparkConf Object.
        //Second parameter is the time interval at which streaming data will be divided into batches  
        val streamCtx = new StreamingContext(conf, Seconds(2))
    
        //Define the type of Stream. Here we are using TCP Socket as text stream, 
        //It will keep watching for the incoming data from a specific machine (localhost) and port (provided as argument) 
        //Once the data is retrieved it will be saved in the memory and in case memory
        //is not sufficient, then it will store it on the Disk
        //It will further read the Data and convert it into DStream
        val lines = streamCtx.socketTextStream("localhost", args(0).toInt, MEMORY_AND_DISK_SER_2)
        
        //Apply the Split() function to all elements of DStream 
        //which will further generate multiple new records from each record in Source Stream
        //And then use flatmap to consolidate all records and create a new DStream.
        val words = lines.flatMap(x => x.split(" "))
        
        //Now, we will count these words by applying a using map()
        //map() helps in applying a given function to each element in an RDD. 
        val pairs = words.map(word => (word, 1))
        
        //Further we will aggregate the value of each key by using/ applying the given function.
        val wordCounts = pairs.reduceByKey(_ + _)
        
        printValues(wordCounts,streamCtx)
        //Most important statement which will initiate the Streaming Context
        streamCtx.start();
        //Wait till the execution is completed.
        streamCtx.awaitTermination();  
      
      }
      
      /**
       * Simple Print function, for printing all elements of RDD
       */
      def printValues(stream:DStream[(String,Int)],streamCtx: StreamingContext){
        stream.foreachRDD(foreachFunc)
        def foreachFunc = (rdd: RDD[(String,Int)]) => {
          val array = rdd.collect()
          println("---------Start Printing Results----------")
          for(res<-array){
            println(res)
          }
          println("---------Finished Printing Results--------")
        }
      }
      
    }

And we're done with the coding of our first Spark Streaming job. Our job is again pretty simple and straightforward. It receives some random text from our stream producer and simply counts the occurrence of unique words and finally prints the same on the driver console. We will soon execute our job but before that let's move to the next section where we will code the same job in Java too.

Note

Follow the comments provided in the code to understand the business logic and other operations. The same style is used further in the chapter and book.

Writing our Spark Streaming job in Java

Perform the following steps for writing our first Spark Streaming job in Java:

  1. Create a new Java class in our Spark-Examples project named chapter.nine.JavaFirstSparkStreamingJob.java.
  2. Next, edit JavaFirstSparkStreamingJob.java and add the following piece of code:
    package chapter.nine;
    import java.util.Arrays;
    
    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.*;
    
    import scala.Tuple2;
    
    public class JavaFirstSparkStreamingJob {  
      
    public static void main(String[] args) {
    
    System.out.println("Creating Spark Configuration");
      // Create an Object of Spark Configuration
        SparkConf conf = new SparkConf();
    // Set the logical and user defined Name of this //Application
    conf.setAppName("Our First Spark Streaming Application in Java");
    System.out.println("Retrieving Streaming Context from Spark Conf");
    // Retrieving Streaming Context from SparkConf Object.
    // Second parameter is the time interval at which streaming //data will be divided into batches
        JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
            Durations.seconds(2));
    
    // Define the type of Stream. Here we are using TCP Socket //as text stream,
    // It will keep watching for the incoming data from a //specific machine
    // (localhost) and port (provided as argument)
    // Once the data is retrieved it will be saved in the //memory and in case memory
    // is not sufficient, then it will store it on the Disk.
    // It will further read the Data and convert it into //DStream
        JavaReceiverInputDStream<String> lines = streamCtx.socketTextStream(
            "localhost", Integer.parseInt(args[0]),
            StorageLevel.MEMORY_AND_DISK_SER_2());
    
      // Apply the x.split() function to all elements of
        // JavaReceiverInputDStream
    // which will further generate multiple new records from 
    // each record in Source Stream
    // And then use flatmap to consolidate all records and //create a new
    // JavaDStream
    JavaDStream<String> words = lines
            .flatMap(new FlatMapFunction<String, String>() {
              @Override
              public Iterable<String> call(String x) {
                return Arrays.asList(x.split(" "));
              }
            });
    
        // Now, we will count these words by applying a using mapToPair()
        // mapToPair() helps in applying a given function to each element in an
        // RDD
        // And further will return the Scala Tuple with //"word" as key and value
        // as "count".
        JavaPairDStream<String, Integer> pairs = words
            .mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s)
                throws Exception {
                return new Tuple2<String, Integer>(s, 1);
              }
            });
    
    // Further we will aggregate the value of each key by //using/ applying the given function.
        JavaPairDStream<String, Integer> wordCounts = pairs
            .reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2)
                throws Exception {
                return i1 + i2;
              }
            });
    
        // Lastly we will print First 10 Words.
        // We can also implement custom print method for printing all values,
        // as we did in Scala example.
        wordCounts.print(10);
        // Most important statement which will initiate the Streaming Context
        streamCtx.start();
        // Wait till the execution is completed.
        streamCtx.awaitTermination();
    
      }
    }

And we are done with our first Spark Streaming job in Java. It also performs the same function as our Scala job, where it receives some random text from our Stream producer, simply counts the occurrence of unique words, and finally print the same on the driver console. Let's move forward and see some action by executing our first streaming job.

Note

Follow the comments provided in the code to understand the business logic and other operations. The same style is used further in the chapter and book.

Executing our Spark Streaming job

In this section, we will execute/launch our first streaming job and will analyze the output on the console. Perform the following steps for launching our first Spark Streaming job:

  1. Compile the Eclipse project Spark-Examples and export it as a JAR file named spark-examples.jar from Eclipse itself.
  2. Next, open your console and execute the following command from the location where we exported our Spark-Examples project:
    java -classpath spark-examples.jar chapter.nine.StreamProducer 9047
    

    Our stream producer is up and running and waiting for clients to connect at port 9047.

  3. Assuming our Spark cluster is up and running, open a new console and execute the following command for launching our Spark Streaming Scala job:
    $SPARK_HOME/bin/spark-submit --class chapter.nine.ScalaFirstSparkStreamingJob --master spark://ip-10-155-38-161:7077 spark-examples.jar 9047
    
  4. For executing a Spark Streaming Java job, execute this command:
    $SPARK_HOME/bin/spark-submit --class chapter.nine.JavaFirstSparkStreamingJob --master spark://ip-10-155-38-161:7077 spark-examples.jar 9047
    

And we are done! Isn't it interesting, simple, and straightforward?

Now whatever we type on the console of our stream producer, it will be sent to our Spark Streaming job and our job will further count the words and will print the same on the driver console itself. The following screenshot shows the output produced by our stream producer:

Executing our Spark Streaming job

The following screenshot shows the output produced by our streaming job that receives the input from our stream producer and then counts and prints the distinct words on the console itself:

Executing our Spark Streaming job

In this section, we coded and executed our first Spark Streaming job in Scala and Java. Let's move forward where we will extend our Chicago crime example and will perform some real-time analysis by integrating Spark Streaming and Spark SQL.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset