Realization of Lambda Architecture

In this section, we will extend our Chicago crime use case to design and code the different layers of Lambda Architecture in Spark.

Let's extend our Chicago crime dataset and assume that the Chicago crime data is delivered in near real-time. Next, our custom consumers will consume the data and will need to find out the number of crimes for each crime category. Though, in most cases, users will require the grouping of data only for the chunk of data received in near real-time, but, in a few use cases, aggregations need to be done on historical data.

Seems like a Lambda use case, doesn't it?

Let's first analyze the complete architecture with all of its components, and then we will describe, code, and execute each and every component of Lambda Architecture.

high-level architecture

In this section, we will discuss the high-level architecture of our Chicago crime use case that is developed using the principles of Lambda Architecture.

We will leverage Spark Streaming and Spark batches along with Cassandra to realize our Lambda Architecture, but users are free to use any technology as per the technology matrix defined in the Technology matrix for Lambda Architecture section.

high-level architecture

The preceding illustration shows a high-level architecture and its various components, which are used to develop the Lambda Architecture for the Chicago crime dataset.

Let's move forward and discuss the role of each of the components:

  • Data source and custom producer: The data source is again the same Chicago crime dataset that we used in the Creating Kinesis stream producers section in Chapter 5, Getting Acquainted with Kinesis. We will place this dataset in a specific location on the file system. Our custom producer will read the few records from this file at regular intervals and will send them to a socket for further consumption.
  • Real-time layer: The real-time layer is a Spark Streaming job, which listens to the socket opened by our custom producer and performs two functions:
    • Submits/persists the raw records into the database, which, in our case, is Cassandra with keyspace lambdaarchitecture and table masterdata.
    • Performs the grouping operation and finds the count of each distinct crime type only in the chunk of records received, and finally persists it into the database, which is, Cassandra with keyspace lambdaarchitecture and table realtimedata. This table will contain the aggregated data for each batch received by our socket streams.
  • Batch layers: Batch layers are Spark batches that can be executed either manually or scheduled to be executed every hour, two hours, or even once a day. They basically perform the following functions:
    • They fetch the complete data from the Cassandra table masterdata and perform the grouping operation to find out the count of each distinct crime type.
    • Finally, they persist the same data into Cassandra with keyspace lambdaarchitecture and table batchviewdata.
    • They also truncate the data from the table realtimedata, assuming that the records in this table are already updated in masterdata.
  • Serving layers: Finally, our serving layer is a Spark batch job that combines both the views: batchviewdata and realtimedata. It performs the grouping on the data for each distinct crime type and prints it on the console.

Now, let's move to the next section where we will code each of these components.

Configuring Apache Cassandra and Spark

Perform the following steps to configure Apache Cassandra and integrate our Spark cluster with Cassandra:

  1. Download and extract Apache Cassandra 2.1.7 from http://www.apache.org/dyn/closer.lua/cassandra/2.1.7/apache-cassandra-2.1.7-bin.tar.gz on the same machine where we installed our Spark binaries.
  2. Execute the following command on your Linux console and define the environment variable called CASSANDRA_HOME, which will point to the directory where we have extracted the downloaded archive file:
    export CASSANDRA_HOME = <location of the extracted Archive>
    
  3. Next, on the same console, execute the following command to bring up your Cassandra database with the default configuration:
    $CASSANDRA_HOME/bin/cassandra
    

    The preceding command will bring up our Cassandra database, which is now ready to serve the user request, but, before this, let's create the keyspace and tables to store our data.

  4. Open a new Linux console and execute the following command to open the Cassandra Query Language (CQL) console:
    $CASSANDRA_HOME/bin/cqlsh
    

    CQLSH is the command-line utility that provides SQL-like syntax to perform CRUD operations on Cassandra databases.

  5. Next, execute the following CQL commands on your CQLSH to create the keyspace and required tables in your Cassandra database:
    CREATE KEYSPACE lambdaarchitecture WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
    CREATE TABLE lambdaarchitecture.masterdata (
      id varchar,
      casenumber varchar,
      date varchar,
      block varchar,
      iucr varchar,
      primarytype varchar,
      description varchar,
      PRIMARY KEY (id)
      );
    CREATE TABLE lambdaarchitecture.realtimedata (
      id bigint,
      primaryType varchar,
      count int,
      PRIMARY KEY (id)
      );
    
    CREATE TABLE lambdaarchitecture.batchviewdata (
      primarytype varchar,
      count int,
      PRIMARY KEY (primarytype)
      );
    

Next, we will configure and integrate our Spark cluster to leverage the Spark-Cassandra APIs for performing CRUD operations:

  1. Download Spark 1.4.0 from http://archive.apache.org/dist/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.4.tgz.

    Note

    The Spark-Cassandra connector is still in the development stages for Spark 1.5.0, so we will use the stable versions of the driver and connector that are available for Spark 1.4.0.

  2. Extract the Spark binaries and point your $SPARK_HOME environment variable to the folder we extracted the binaries to:
    export SPARK_HOME = <location of the Spark extracted Archive>
    
  3. Download the following JAR files and persist them into your $CASSANDRA_HOME/lib directory:
  4. Next, open and edit the $SPARK_HOME/conf/spark-default.conf file and define the following environment variables:
    spark.driver.extraClassPath=$CASSANDRA_HOME/lib/apache-cassandra-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-clientutil-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-thrift-2.1.7.jar:$CASSANDRA_HOME/lib/cassandra-driver-internal-only-2.5.1.zip:$CASSANDRA_HOME/lib/thrift-server-0.3.7.jar:$CASSANDRA_HOME/lib/guava-16.0.jar:$CASSANDRA_HOME/lib/joda-convert-1.2.jar:$CASSANDRA_HOME/lib/joda-time-2.3.jar:$CASSANDRA_HOME/lib/jsr166e-1.1.0.jar:$CASSANDRA_HOME/lib/spark-cassandra-connector_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/spark-cassandra-connector-java_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/cassandra-driver-core-2.1.9.jar
    spark.executor.extraClassPath=$CASSANDRA_HOME/lib/apache-cassandra-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-clientutil-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-thrift-2.1.7.jar:$CASSANDRA_HOME/lib/cassandra-driver-internal-only-2.5.1.zip:$CASSANDRA_HOME/lib/thrift-server-0.3.7.jar:$CASSANDRA_HOME/lib/guava-16.0.jar:$CASSANDRA_HOME/lib/joda-convert-1.2.jar:$CASSANDRA_HOME/lib/joda-time-2.3.jar:$CASSANDRA_HOME/lib/jsr166e-1.1.0.jar:$CASSANDRA_HOME/lib/spark-cassandra-connector_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/spark-cassandra-connector-java_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/cassandra-driver-core-2.1.9.jar
    spark.cassandra.connection.host=localhost

    Replace $CASSANDRA_HOME with the actual location on the filesystem and save the file.

  5. Next, bring down your Spark cluster and bring up the master and worker process with Spark 1.4.0 binaries.

    Note

    Refer to https://github.com/datastax/spark-cassandra-connector for more information on the Spark-Cassandra connector.

We are done with the configuration and integration of Spark with Cassandra. Now, we will move to the next section where we will code the producer and other layers/jobs.

Coding the custom producer

Perform the following steps to code a custom producer in Java:

  1. Open the Spark-Examples project in Eclipse, and add a new package and class called chapter.ten.producer.CrimeProducer.java.
  2. Edit CrimeProducer.java and add the following piece of code in it:
    package chapter.ten.producer;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Random;
    
    public class CrimeProducer {
    
      public static void main(String[] args) {
    
        if (args == null || args.length < 1) {
    System.out.println("Usage - java chapter.ten.CrimeProducer <port#>");
          System.exit(0);
        }
    System.out.println("Defining new Socket on " + args[0]);
    try (ServerSocket soc = new ServerSocket(Integer.parseInt(args[0]))) {
    
    System.out.println("Waiting for Incoming Connection on - "
              + args[0]);
        Socket clientSocket = soc.accept();
      System.out.println("Connection Received");
    OutputStream outputStream = clientSocket.getOutputStream();
    // Path of the file from where we need to read crime //records.
    String filePath = "/home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv";
    PrintWriter out = new PrintWriter(outputStream, true);
    BufferedReader brReader = new BufferedReader(new FileReader(filePath));
    // Defining Random number to read different number of //records each time.
      Random number = new Random();
    // Keep Reading the data in a Infinite loop and send it //over to the Socket.
        while (true) {
        System.out.println("Reading Crime Records");
        StringBuilder dataBuilder = new StringBuilder();
      // Getting new Random Integer between 0 and 20
      int recordsToRead = number.nextInt(20);
    System.out.println("Records to Read = " + recordsToRead);
        for (int i = 0; i < recordsToRead; i++) {
      String dataLine = brReader.readLine() + "
    ";
              dataBuilder.append(dataLine);
            }
    System.out.println("Data received and now writing it to Socket");
        out.println(dataBuilder);
        out.flush();
    // Sleep for 20 Seconds before reading again
        Thread.sleep(20000);
          }
        } catch (Exception e) {
          e.printStackTrace();
        }}
    }

We are done with our crime producer!

Note

Follow the comments provided in the code to understand the flow of code. The same style will be used further in the chapter.

Coding the real-time layer

Perform the following steps to code real-time layer in Scala:

  1. Open the Spark-Examples project in Eclipse, and add a new Scala package and object to it named chapter.ten.dataConsumption.LADataConsumptionJob.scala.
  2. LADataConsumptionJob will be our Spark Streaming job that will listen to the socket and consume data as it arrives. So, apart from creating StreamingContext and leveraging socketTextStream for consuming data, we will also define two functions: one for persisting/appending data in the master table (masterdata) in Cassandra, and another for grouping the same data to find out the count of distinct crime types and finally persisting it into the real-time view table (realtimedata) in Cassandra. So, as the first step, define persistInMaster(…) for persisting the raw data into the master table:
    def persistInMaster(streamCtx:StreamingContext,lines:DStream[String]){
        
        //Define Keyspace
        val keyspaceName ="lambdaarchitecture"
        //Define Table for persisting Master records
        val csMasterTableName="masterdata"
        
        lines.foreachRDD { 
          x => 
          //Splitting, flattening and finally filtering to exclude any Empty Rows
          val rawCrimeRDD = x.map(_.split("
    ")).flatMap { x => x }.filter { x => x.length()>2 }
          println("Master Data Received = "+rawCrimeRDD.collect().length)
          
          //Splitting again for each Distinct value in the Row and creating Scala SEQ
          val splitCrimeRDD = rawCrimeRDD.map { x => x.split(",") }.map(c => createSeq(c))
          println("Now creating Sequence Persisting")
          //Finally Flattening the results and persisting in the table.
          val crimeRDD = splitCrimeRDD.flatMap(f=>f)
          crimeRDD.saveToCassandra(keyspaceName, csMasterTableName, SomeColumns("id","casenumber","date","block","iucr", "primarytype", "description"))
        }
      }
  3. The preceding function accepts an RDD of strings and persists the same into the master table (masterdata) in Cassandra. Now, define generateRealTimeView(…), which will aggregate the raw crime data (RDD of strings) and persist it into the real-time table (realtimedata) in Cassandra:
    def generateRealTimeView(streamCtx:StreamingContext,lines:DStream[String]){
         //Define Keyspace
        val keyspaceName ="lambdaarchitecture"
        //Define table to persisting process records
        val csRealTimeTableName="realtimedata"
        
        lines.foreachRDD { 
          x => 
          //Splitting, flattening and finally filtering to exclude any Empty Rows
          val rawCrimeRDD = x.map(_.split("
    ")).flatMap { x => x }.filter { x => x.length()>2 }
          println("Real Time Data Received = "+rawCrimeRDD.collect().length)
          //Splitting again for each Distinct value in the Row
          val splitCrimeRDD = rawCrimeRDD.map { x => x.split(",") }
          //Converting RDD of String to Objects [Crime] 
          val crimeRDD = splitCrimeRDD.map(c => Crime(c(0), c(1),c(2),c(3),c(4),c(5),c(6)))
           val sqlCtx = getInstance(streamCtx.sparkContext)
           //Using Dynamic Mapping for creating DF 
          import sqlCtx.implicits._
          //Converting RDD to DataFrame
          val dataFrame = crimeRDD.toDF()
          //Perform few operations on DataFrames
          println("Number of Rows in Data Frame = "+dataFrame.count())
          
    // Perform Group By Operation using Raw SQL
          val rtViewFrame = dataFrame.groupBy("primarytype").count()
          //Adding a new column to DF for PK
          val finalRTViewFrame = rtViewFrame.withColumn("id", new Column("count")+System.nanoTime)
          //Printing the records which will be persisted in Cassandra
          println("showing records which will be persisted into the realtime view")
          finalRTViewFrame.show(10)
          
          //Leveraging the DF.save for persisting/ Appending the complete DataFrame.
          finalRTViewFrame.write.format("org.apache.spark.sql.cassandra").
          options(Map( "table" -> "realtimedata", "keyspace" -> "lambdaarchitecture" )).
          mode(SaveMode.Append).save() 
        }    
      }
      
      //Defining Singleton SQLContext variable
      @transient private var instance: SQLContext = null
      //Lazy initialization of SQL Context
      def getInstance(sparkContext: SparkContext): SQLContext =
        synchronized {
          if (instance == null) {
            instance = new SQLContext(sparkContext)
          }
          instance
        }

    Note

    Refer to the code bundle provided with the book for a complete implementation.

Coding the batch layer

Perform the following steps to code the batch layer in Scala:

  1. Open the Spark-Examples project in Eclipse, and add a new Scala package and object named chapter.ten.batch.LAGenerateBatchViewJob.scala.
  2. LAGenerateBatchViewJob will be our Spark batch job, which will simply get the data from the master table, group the records based on the crime type, and persist the grouped records into the master view table (batchviewdata) in Cassandra. Apart from defining SparkContext and SparkConf in the main method, we will define and invoke one more function called generateMasterView(SparkContext):
    def generateMasterView(sparkCtx:SparkContext){
        //Define Keyspace
        val keyspaceName ="lambdaarchitecture"
        //Define Master (Append Only) Table
        val csMasterTableName="masterdata"
        //Define Real Time Table
        val csRealTimeTableName="realtimedata"
        //Define Table for persisting Batch View Data
        val csBatchViewTableName = "batchviewdata"
        
        // Get Instance of Spark SQL
        val sqlCtx = getInstance(sparkCtx)
        //Load the data from "masterdata" Table
         val df  = sqlCtx.read.format("org.apache.spark.sql.cassandra")
         .options(Map( "table" -> "masterdata", "keyspace" -> "lambdaarchitecture" )).load()
         //Applying standard DataFrame function for
         //performing grouping of Crime Data by Primary Type. 
         val batchView = df.groupBy("primarytype").count()
         //Persisting the grouped data into the Batch View Table
         batchView.write.format("org.apache.spark.sql.cassandra").
         options(Map( "table" -> "batchviewdata", "keyspace" -> "lambdaarchitecture" )).mode(SaveMode.Overwrite).save()
         
         //Delete the Data from Real-Time Table as now it is 
         //already part of grouping done in previous steps 
         val csConnector = CassandraConnector.apply(sparkCtx.getConf)
         val csSession = csConnector.openSession()
         csSession.execute("TRUNCATE "+keyspaceName+"."+csRealTimeTableName)
         csSession.close()
         println("Data Persisted in the Batch View Table - lambdaarchitecture.batchviewdata")    
      }
      
      
      //Defining Singleton SQLContext variable
      @transient private var instance: SQLContext = null
      //Lazy initialization of SQL Context
      def getInstance(sparkContext: SparkContext): SQLContext =
        synchronized {
          if (instance == null) {
            instance = new SQLContext(sparkContext)
          }
          instance
        }

    Note

    Refer to the code bundle provided with the book for a complete implementation.

Coding the serving layer

Perform the following steps to code the serving layer in Scala:

  1. Open the Spark-Examples project in Eclipse, and add a new Scala package and object called chapter.ten.serving.LAServingJob.scala.
  2. LAServingJob again is a batch job, which gets the data from both the Cassandra tables, batchviewdata and realtimedata, and groups them to present final records to the consumer. Apart from defining SparkContext and SparkConf in the main method, we will define and invoke one more function called generatefinalView(SparkContext):
    def generatefinalView(sparkCtx:SparkContext){
        //Define Keyspace
        val keyspaceName ="lambdaarchitecture"
        //Define Master (Append Only) Table
        val csRealTimeTableName="realtimedata"
        //Define Table for persisting Batch View Data
        val csBatchViewTableName = "batchviewdata"
        
        // Get Instance of Spark SQL
        val sqlCtx = getInstance(sparkCtx)
        //Load the data from "batchviewdata" Table
        val batchDf  = sqlCtx.read.format("org.apache.spark.sql.cassandra")
         .options(Map( "table" -> "batchviewdata", "keyspace" -> "lambdaarchitecture" )).load()
         
        //Load the data from "realtimedata" Table
        val realtimeDF  = sqlCtx.read.format("org.apache.spark.sql.cassandra")
         .options(Map( "table" -> "realtimedata", "keyspace" -> "lambdaarchitecture" )).load()
         
    //Select only Primary Type and Count from Real Time View
         val seRealtimeDF = realtimeDF.select("primarytype", "count")
         
         //Merge/ Union both ("batchviewdata" and "realtimedata") and 
         //produce/ print the final Output on Console 
         println("Final View after merging Batch and Real-Time Views")
         val finalView = batchDf.unionAll(seRealtimeDF).groupBy("primarytype").sum("count")
         finalView.show(20)
      }

We are done with all the layers of our Lambda Architecture! Now, let's move to the next section, where we will execute all the layers one by one and see the results.

Note

Ensure that you provide the required dependencies for Cassandra and the Spark-Cassandra driver to your Eclipse project for compilation.

Executing all the layers

Let's perform the following steps and bring up all our services:

  1. As a first step, export your Spark-Examples project as spark-examples.jar and save it in a directory. Refer to this directory as <LA-HOME>.
  2. Next, bring up your crime producer. Open a new Linux console and execute the following command from <LA-HOME>:
    java -classpath spark-examples.jar  chapter.ten.producer.CrimeProducer 9047
    
  3. Next, we will bring up our real-time job so that the master table is populated, and, at the same time, real-time views are also populated. So, open a new console, browse to <LA-HOME>, and execute the following command to bring up your real-time job:
    $SPARK_HOME/bin/spark-submit --class chapter.ten.dataConsumption.LADataConsumptionJob --master spark://ip-10-149-132-99:7077 spark-examples.jar 9047
    
  4. As soon as we execute the preceding command, our two Cassandra tables—masterdata and realtimedata—will be populated. Now, in order to browse the data, we can open a new Linux console and execute the following commands:
    $CASSANDRA_HOME/bin/cqlsh
    Select * from lambdaarchitecture.masterdata;
    

    The output is shown in the following screenshot:

    Executing all the layers

    The preceding screenshot shows the output of the master table in Cassandra. We can also execute Select * from lambdaarchitecture.realtimedata; on the same CQLSH window to see the data in the real-time table.

  5. The next step will be to bring up our batch layer and execute our batch job—LAGenerateBatchViewJob. Our batch job is a manual job, which can be scheduled to be executed at regular intervals, but, for the sake of simplicity, we can execute the following command on a new Linux console:
    $SPARK_HOME/bin/spark-submit --class chapter.ten.batch.LAGenerateBatchViewJob --master spark://ip-10-149-132-99:7077 spark-examples.jar
    

    As soon as we execute the preceding command, our batchviewdata table in Cassandra will be populated with the latest data. We can execute Select * from lambdaarchitecture.batchviewdata; on our CQLSH window to browse the data:

    Executing all the layers

    The preceding illustration shows the aggregated crime data in the batchviewdata table.

  6. The final step is to bring up our serving layer, called LAServingJob, which is again executed on user request. This job will merge the batch and real-time views and present a single output. We can open a new Linux console and execute the following command:
    $SPARK_HOME/bin/spark-submit --class chapter.ten.serving.LAServingJob --master spark://ip-10-149-132-99:7077 spark-examples.jar
    

    As soon as we execute the preceding command, it will produce and print the merged views of data residing in batchviewdata and realtimedata, which will be similar to the following screenshot:

    Executing all the layers

We are done with all the layers of our Lambda Architecture!

We have just touched the tip of the iceberg where we have discussed the overall concept and objective of Lambda Architecture. Architectures developed on the principles of Lambda can be much more complex and thought provoking, which is interesting to discuss and solve, but that will not be in the scope or context of this book or chapter.

In this section, we have realized the Lambda Architecture using Spark and Cassandra. We have developed all the layers of Lambda Architecture using Spark Streaming and Spark batches, and also leveraged and integrated Apache Cassandra with Spark for persisting the data.

Do write back or e-mail me at with any queries or use cases regarding Lambda Architecture, and I will work with you to solve or implement your queries or use cases.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset